diff --git a/CHANGELOG.md b/CHANGELOG.md index d2b4e116108..fa5d093b2b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Browser/Playwright: ignore benign already-handled route races during guarded navigation so browser-page tasks no longer fail when Playwright tears down a route mid-flight. (#68708) Thanks @Steady-ai. +- Telegram: prevent duplicate in-process long pollers for the same bot token and add clearer `getUpdates` conflict diagnostics for external duplicate pollers. Fixes #56230. - Browser/Linux: detect Chromium-based installs under `/opt/google`, `/opt/brave.com`, `/usr/lib/chromium`, and `/usr/lib/chromium-browser` before asking users to set `browser.executablePath`. (#48563) Thanks @lupuletic. - Sessions/browser: close tracked browser tabs when idle, daily, `/new`, or `/reset` session rollover archives the previous transcript, preventing tabs from leaking past the old session. Thanks @jakozloski. - Sessions/forking: fall back to transcript-estimated parent token counts when cached totals are stale or missing, so oversized thread forks start fresh instead of cloning the full parent transcript. Thanks @jalehman. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index a787e3a82b3..e0c5611d5bd 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -257,6 +257,7 @@ curl "https://api.telegram.org/bot/getUpdates" - Group sessions are isolated by group ID. Forum topics append `:topic:` to keep topics isolated. - DM messages can carry `message_thread_id`; OpenClaw routes them with thread-aware session keys and preserves thread ID for replies. - Long polling uses grammY runner with per-chat/per-thread sequencing. Overall runner sink concurrency uses `agents.defaults.maxConcurrent`. +- Long polling is guarded inside each gateway process so only one active poller can use a bot token at a time. If you still see `getUpdates` 409 conflicts, another OpenClaw gateway, script, or external poller is likely using the same token. - Long-polling watchdog restarts trigger after 120 seconds without completed `getUpdates` liveness by default. Increase `channels.telegram.pollingStallThresholdMs` only if your deployment still sees false polling-stall restarts during long-running work. The value is in milliseconds and is allowed from `30000` to `600000`; per-account overrides are supported. - Telegram Bot API has no read-receipt support (`sendReadReceipts` does not apply). diff --git a/extensions/telegram/src/monitor.test.ts b/extensions/telegram/src/monitor.test.ts index 7badd791fed..76f636dc79c 100644 --- a/extensions/telegram/src/monitor.test.ts +++ b/extensions/telegram/src/monitor.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite type MonitorTelegramOpts = import("./monitor.js").MonitorTelegramOpts; let monitorTelegramProvider: typeof import("./monitor.js").monitorTelegramProvider; +let resetTelegramPollingLeasesForTests: typeof import("./polling-lease.js").resetTelegramPollingLeasesForTests; type MockCtx = { message: { @@ -337,9 +338,11 @@ describe("monitorTelegramProvider (grammY)", () => { beforeAll(async () => { ({ monitorTelegramProvider } = await import("./monitor.js")); + ({ resetTelegramPollingLeasesForTests } = await import("./polling-lease.js")); }); beforeEach(() => { + resetTelegramPollingLeasesForTests(); loadConfig.mockReturnValue({ agents: { defaults: { maxConcurrent: 2 } }, channels: { telegram: {} }, @@ -519,6 +522,69 @@ describe("monitorTelegramProvider (grammY)", () => { expect(createdBotStops[0]).toHaveBeenCalledTimes(1); }); + it("refuses a concurrent same-token polling monitor before starting another runner", async () => { + const abort = new AbortController(); + const firstCycle = mockRunOnceWithStalledPollingRunner(); + + const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await firstCycle.waitForRunStart(); + + await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow( + "refusing duplicate poller", + ); + expect(runSpy).toHaveBeenCalledTimes(1); + + abort.abort(); + await monitor; + }); + + it("allows concurrent polling monitors for different bot tokens", async () => { + const firstAbort = new AbortController(); + const secondAbort = new AbortController(); + const firstCycle = mockRunOnceWithStalledPollingRunner(); + const secondCycle = mockRunOnceWithStalledPollingRunner(); + + const firstMonitor = monitorTelegramProvider({ + token: "tok-a", + abortSignal: firstAbort.signal, + }); + await firstCycle.waitForRunStart(); + const secondMonitor = monitorTelegramProvider({ + token: "tok-b", + abortSignal: secondAbort.signal, + }); + await secondCycle.waitForRunStart(); + + expect(runSpy).toHaveBeenCalledTimes(2); + + firstAbort.abort(); + secondAbort.abort(); + await Promise.all([firstMonitor, secondMonitor]); + }); + + it("starts a same-token replacement after the previous monitor releases", async () => { + const firstAbort = new AbortController(); + const secondAbort = new AbortController(); + const firstCycle = mockRunOnceWithStalledPollingRunner(); + + const firstMonitor = monitorTelegramProvider({ + token: "tok", + abortSignal: firstAbort.signal, + }); + await firstCycle.waitForRunStart(); + firstAbort.abort(); + + const secondCycle = mockRunOnceAndAbort(secondAbort); + const secondMonitor = monitorTelegramProvider({ + token: "tok", + abortSignal: secondAbort.signal, + }); + await secondCycle.waitForRunStart(); + await Promise.all([firstMonitor, secondMonitor]); + + expect(runSpy).toHaveBeenCalledTimes(2); + }); + it("clears bounded cleanup timers after a clean stop", async () => { vi.useFakeTimers(); try { diff --git a/extensions/telegram/src/monitor.ts b/extensions/telegram/src/monitor.ts index 1691607d60b..cec22f6910c 100644 --- a/extensions/telegram/src/monitor.ts +++ b/extensions/telegram/src/monitor.ts @@ -17,6 +17,7 @@ import { isRecoverableTelegramNetworkError, isTelegramPollingNetworkError, } from "./network-errors.js"; +import { acquireTelegramPollingLease } from "./polling-lease.js"; import { makeProxyFetch } from "./proxy.js"; export type { MonitorTelegramOpts } from "./monitor.types.js"; @@ -161,76 +162,96 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const { TelegramPollingSession, readTelegramUpdateOffset, writeTelegramUpdateOffset } = await loadTelegramMonitorPollingRuntime(); - if (isTelegramExecApprovalHandlerConfigured({ cfg, accountId: account.accountId })) { - registerChannelRuntimeContext({ - channelRuntime: opts.channelRuntime, - channelId: "telegram", - accountId: account.accountId, - capability: CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY, - context: { token }, - abortSignal: opts.abortSignal, - }); - } - - const persistedOffsetRaw = await readTelegramUpdateOffset({ + const pollingLease = await acquireTelegramPollingLease({ + token, accountId: account.accountId, - botToken: token, + abortSignal: opts.abortSignal, }); - let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw); - if (persistedOffsetRaw !== null && lastUpdateId === null) { + if (pollingLease.waitedForPrevious) { log( - `[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`, + `[telegram][diag] waited for previous polling session for bot token ${pollingLease.tokenFingerprint} before starting account "${account.accountId}".`, + ); + } + if (pollingLease.replacedStoppingPrevious) { + log( + `[telegram][diag] previous polling session for bot token ${pollingLease.tokenFingerprint} did not stop within the lease wait; starting a replacement for account "${account.accountId}".`, ); } - const persistUpdateId = async (updateId: number) => { - const normalizedUpdateId = normalizePersistedUpdateId(updateId); - if (normalizedUpdateId === null) { - log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`); - return; - } - if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) { - return; - } - lastUpdateId = normalizedUpdateId; - try { - await writeTelegramUpdateOffset({ + try { + if (isTelegramExecApprovalHandlerConfigured({ cfg, accountId: account.accountId })) { + registerChannelRuntimeContext({ + channelRuntime: opts.channelRuntime, + channelId: "telegram", accountId: account.accountId, - updateId: normalizedUpdateId, - botToken: token, + capability: CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY, + context: { token }, + abortSignal: opts.abortSignal, }); - } catch (err) { - (opts.runtime?.error ?? console.error)( - `telegram: failed to persist update offset: ${String(err)}`, + } + + const persistedOffsetRaw = await readTelegramUpdateOffset({ + accountId: account.accountId, + botToken: token, + }); + let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw); + if (persistedOffsetRaw !== null && lastUpdateId === null) { + log( + `[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`, ); } - }; - // Preserve sticky IPv4 fallback state across clean/conflict restarts. - // Dirty polling cycles rebuild transport inside TelegramPollingSession. - const createTelegramTransportForPolling = () => - resolveTelegramTransport(proxyFetch, { - network: account.config.network, + const persistUpdateId = async (updateId: number) => { + const normalizedUpdateId = normalizePersistedUpdateId(updateId); + if (normalizedUpdateId === null) { + log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`); + return; + } + if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) { + return; + } + lastUpdateId = normalizedUpdateId; + try { + await writeTelegramUpdateOffset({ + accountId: account.accountId, + updateId: normalizedUpdateId, + botToken: token, + }); + } catch (err) { + (opts.runtime?.error ?? console.error)( + `telegram: failed to persist update offset: ${String(err)}`, + ); + } + }; + + // Preserve sticky IPv4 fallback state across clean/conflict restarts. + // Dirty polling cycles rebuild transport inside TelegramPollingSession. + const createTelegramTransportForPolling = () => + resolveTelegramTransport(proxyFetch, { + network: account.config.network, + }); + const telegramTransport = createTelegramTransportForPolling(); + + pollingSession = new TelegramPollingSession({ + token, + config: cfg, + accountId: account.accountId, + runtime: opts.runtime, + proxyFetch, + abortSignal: opts.abortSignal, + runnerOptions: createTelegramRunnerOptions(cfg), + getLastUpdateId: () => lastUpdateId, + persistUpdateId, + log, + telegramTransport, + createTelegramTransport: createTelegramTransportForPolling, + stallThresholdMs: account.config.pollingStallThresholdMs, + setStatus: opts.setStatus, }); - const telegramTransport = createTelegramTransportForPolling(); - - pollingSession = new TelegramPollingSession({ - token, - config: cfg, - accountId: account.accountId, - runtime: opts.runtime, - proxyFetch, - abortSignal: opts.abortSignal, - runnerOptions: createTelegramRunnerOptions(cfg), - getLastUpdateId: () => lastUpdateId, - persistUpdateId, - log, - telegramTransport, - createTelegramTransport: createTelegramTransportForPolling, - stallThresholdMs: account.config.pollingStallThresholdMs, - setStatus: opts.setStatus, - }); - await pollingSession.runUntilAbort(); + await pollingSession.runUntilAbort(); + } finally { + pollingLease.release(); + } } finally { unregisterHandler(); } diff --git a/extensions/telegram/src/polling-lease.test.ts b/extensions/telegram/src/polling-lease.test.ts new file mode 100644 index 00000000000..93744a6bb7b --- /dev/null +++ b/extensions/telegram/src/polling-lease.test.ts @@ -0,0 +1,102 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + acquireTelegramPollingLease, + resetTelegramPollingLeasesForTests, +} from "./polling-lease.js"; + +describe("Telegram polling lease", () => { + beforeEach(() => { + resetTelegramPollingLeasesForTests(); + }); + + it("refuses an active duplicate poller for the same bot token", async () => { + const first = await acquireTelegramPollingLease({ + token: "123:abc", + accountId: "default", + }); + + await expect( + acquireTelegramPollingLease({ + token: "123:abc", + accountId: "ops", + }), + ).rejects.toThrow('refusing duplicate poller for account "ops"'); + + first.release(); + }); + + it("allows concurrent pollers for different bot tokens", async () => { + const first = await acquireTelegramPollingLease({ + token: "123:abc", + accountId: "default", + }); + const second = await acquireTelegramPollingLease({ + token: "456:def", + accountId: "ops", + }); + + expect(first.tokenFingerprint).not.toBe(second.tokenFingerprint); + + first.release(); + second.release(); + }); + + it("waits for an aborting same-token poller before acquiring", async () => { + const oldAbort = new AbortController(); + const first = await acquireTelegramPollingLease({ + token: "123:abc", + accountId: "default", + abortSignal: oldAbort.signal, + }); + oldAbort.abort(); + + const acquire = acquireTelegramPollingLease({ + token: "123:abc", + accountId: "default", + waitMs: 1_000, + }); + await Promise.resolve(); + first.release(); + const second = await acquire; + + expect(second.waitedForPrevious).toBe(true); + expect(second.replacedStoppingPrevious).toBe(false); + + second.release(); + }); + + it("does not let stale release clear a replacement lease", async () => { + vi.useFakeTimers(); + try { + const oldAbort = new AbortController(); + const first = await acquireTelegramPollingLease({ + token: "123:abc", + accountId: "old", + abortSignal: oldAbort.signal, + }); + oldAbort.abort(); + + const acquireReplacement = acquireTelegramPollingLease({ + token: "123:abc", + accountId: "new", + waitMs: 10, + }); + await vi.advanceTimersByTimeAsync(10); + const replacement = await acquireReplacement; + expect(replacement.replacedStoppingPrevious).toBe(true); + + first.release(); + + await expect( + acquireTelegramPollingLease({ + token: "123:abc", + accountId: "third", + }), + ).rejects.toThrow('account "new"'); + + replacement.release(); + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/extensions/telegram/src/polling-lease.ts b/extensions/telegram/src/polling-lease.ts new file mode 100644 index 00000000000..a29476dd1fe --- /dev/null +++ b/extensions/telegram/src/polling-lease.ts @@ -0,0 +1,193 @@ +import { createHash } from "node:crypto"; + +const TELEGRAM_POLLING_LEASES_KEY = Symbol.for("openclaw.telegram.pollingLeases"); +const DEFAULT_TELEGRAM_POLLING_LEASE_WAIT_MS = 5_000; + +type TelegramPollingLeaseEntry = { + accountId: string; + abortSignal?: AbortSignal; + done: Promise; + owner: symbol; + resolveDone: () => void; + startedAt: number; +}; + +type TelegramPollingLeaseRegistry = Map; + +export type TelegramPollingLease = { + tokenFingerprint: string; + waitedForPrevious: boolean; + replacedStoppingPrevious: boolean; + release: () => void; +}; + +type AcquireTelegramPollingLeaseOpts = { + token: string; + accountId: string; + abortSignal?: AbortSignal; + waitMs?: number; +}; + +type WaitForPreviousResult = "released" | "timeout" | "aborted"; + +function pollingLeaseRegistry(): TelegramPollingLeaseRegistry { + const proc = process as NodeJS.Process & { + [TELEGRAM_POLLING_LEASES_KEY]?: TelegramPollingLeaseRegistry; + }; + proc[TELEGRAM_POLLING_LEASES_KEY] ??= new Map(); + return proc[TELEGRAM_POLLING_LEASES_KEY]; +} + +function tokenFingerprint(token: string): string { + return createHash("sha256").update(token).digest("hex").slice(0, 16); +} + +function createDuplicatePollingError(params: { + accountId: string; + existing: TelegramPollingLeaseEntry; + tokenFingerprint: string; +}): Error { + const ageMs = Math.max(0, Date.now() - params.existing.startedAt); + const ageSeconds = Math.round(ageMs / 1000); + return new Error( + `Telegram polling already active for bot token ${params.tokenFingerprint} on account "${params.existing.accountId}" (${ageSeconds}s old); refusing duplicate poller for account "${params.accountId}". Stop the existing OpenClaw gateway/poller or use a different bot token.`, + ); +} + +async function waitForPreviousRelease(params: { + done: Promise; + signal?: AbortSignal; + waitMs: number; +}): Promise { + if (params.signal?.aborted) { + return "aborted"; + } + + let timer: ReturnType | undefined; + let abortListener: (() => void) | undefined; + try { + const timeout = new Promise<"timeout">((resolve) => { + timer = setTimeout(() => resolve("timeout"), Math.max(0, params.waitMs)); + timer.unref?.(); + }); + const aborted = new Promise<"aborted">((resolve) => { + abortListener = () => resolve("aborted"); + params.signal?.addEventListener("abort", abortListener, { once: true }); + }); + const released = params.done.then(() => "released" as const); + return await Promise.race([released, timeout, aborted]); + } finally { + if (timer) { + clearTimeout(timer); + } + if (abortListener) { + params.signal?.removeEventListener("abort", abortListener); + } + } +} + +function createLease(params: { + accountId: string; + abortSignal?: AbortSignal; + registry: TelegramPollingLeaseRegistry; + tokenFingerprint: string; + waitedForPrevious: boolean; + replacedStoppingPrevious: boolean; +}): TelegramPollingLease { + let resolveDone!: () => void; + const done = new Promise((resolve) => { + resolveDone = resolve; + }); + const owner = Symbol(`telegram-polling:${params.accountId}`); + const entry: TelegramPollingLeaseEntry = { + accountId: params.accountId, + abortSignal: params.abortSignal, + done, + owner, + resolveDone, + startedAt: Date.now(), + }; + params.registry.set(params.tokenFingerprint, entry); + + let released = false; + return { + tokenFingerprint: params.tokenFingerprint, + waitedForPrevious: params.waitedForPrevious, + replacedStoppingPrevious: params.replacedStoppingPrevious, + release: () => { + if (released) { + return; + } + released = true; + const current = params.registry.get(params.tokenFingerprint); + if (current?.owner === owner) { + params.registry.delete(params.tokenFingerprint); + } + resolveDone(); + }, + }; +} + +export async function acquireTelegramPollingLease( + opts: AcquireTelegramPollingLeaseOpts, +): Promise { + const registry = pollingLeaseRegistry(); + const fingerprint = tokenFingerprint(opts.token); + const waitMs = opts.waitMs ?? DEFAULT_TELEGRAM_POLLING_LEASE_WAIT_MS; + let waitedForPrevious = false; + + for (;;) { + const existing = registry.get(fingerprint); + if (!existing) { + return createLease({ + accountId: opts.accountId, + abortSignal: opts.abortSignal, + registry, + tokenFingerprint: fingerprint, + waitedForPrevious, + replacedStoppingPrevious: false, + }); + } + + if (!existing.abortSignal?.aborted) { + throw createDuplicatePollingError({ + accountId: opts.accountId, + existing, + tokenFingerprint: fingerprint, + }); + } + + waitedForPrevious = true; + const waitResult = await waitForPreviousRelease({ + done: existing.done, + signal: opts.abortSignal, + waitMs, + }); + if (waitResult === "aborted") { + throw new Error( + `Telegram polling start aborted while waiting for previous poller for bot token ${fingerprint} to stop.`, + ); + } + + const current = registry.get(fingerprint); + if (current !== existing) { + continue; + } + if (waitResult === "released") { + continue; + } + + return createLease({ + accountId: opts.accountId, + abortSignal: opts.abortSignal, + registry, + tokenFingerprint: fingerprint, + waitedForPrevious, + replacedStoppingPrevious: true, + }); + } +} + +export function resetTelegramPollingLeasesForTests(): void { + pollingLeaseRegistry().clear(); +} diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 2d2666e9642..2150fb2c332 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -953,6 +953,32 @@ describe("TelegramPollingSession", () => { expect(transport2.close).toHaveBeenCalledTimes(1); }); + it("logs an actionable duplicate-poller hint for getUpdates conflicts", async () => { + const abort = new AbortController(); + const log = vi.fn(); + const conflictError = Object.assign( + new Error("Conflict: terminated by other getUpdates request"), + { + error_code: 409, + method: "getUpdates", + }, + ); + createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot()); + isRecoverableTelegramNetworkErrorMock.mockReturnValue(false); + mockRestartAfterPollingError(conflictError, abort); + + const session = createPollingSession({ + abortSignal: abort.signal, + log, + }); + + await session.runUntilAbort(); + + expect(log).toHaveBeenCalledWith( + expect.stringContaining("Another OpenClaw gateway, script, or Telegram poller"), + ); + }); + it("closes the transport once when runUntilAbort exits normally", async () => { const abort = new AbortController(); const transport = makeTelegramTransport(); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index c69d7e9d432..bd80a91abb9 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -381,11 +381,14 @@ export class TelegramPollingSession { } const reason = isConflict ? "getUpdates conflict" : "network error"; const errMsg = formatErrorMessage(err); + const conflictHint = isConflict + ? " Another OpenClaw gateway, script, or Telegram poller may be using this bot token; stop the duplicate poller or switch this account to webhook mode." + : ""; this.opts.log( - `[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}`, + `[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}${conflictHint}`, ); const shouldRestart = await this.#waitBeforeRestart( - (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, + (delay) => `Telegram ${reason}: ${errMsg};${conflictHint} retrying in ${delay}.`, ); return shouldRestart ? "continue" : "exit"; } finally {