From 25a8f5f3f8529fbecdadd3447147b4886e491aef Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 20:35:34 +0100 Subject: [PATCH] fix: surface stalled telegram ingress backlog --- CHANGELOG.md | 1 + .../telegram/src/polling-session.test.ts | 309 ++++++++++++++++++ extensions/telegram/src/polling-session.ts | 116 ++++++- extensions/telegram/src/polling-status.ts | 7 + extensions/telegram/src/status-issues.ts | 12 +- extensions/telegram/src/status.test.ts | 25 ++ 6 files changed, 450 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f29385ec6e6..32575481647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai - Control UI/WebChat: keep optimistic image messages from embedding large inline `data:` previews and preserve image-only user turns in chat history, avoiding browser stack overflows when sending image attachments. Fixes #82182. Thanks @ExploreSheep. - Agents/media: preserve message-tool-only delivery for generated music and video completion handoffs, so group/channel completions do not finish without posting the generated attachment. - Telegram: drain queued outbound deliveries after polling reconnect confirms fresh `getUpdates` activity, so stale-socket and network recovery do not leave failed replies stranded. Fixes #50040. Refs #82175. Thanks @dmitriiforpost-commits and @shellyrocklobster. +- Telegram: mark isolated polling ingress unhealthy when a spooled inbound backlog stalls while Bot API polling still succeeds, so gateway/channel health no longer stays green after Telegram DM processing wedges. Fixes #82175. Thanks @shellyrocklobster. - Agents: strip Gemini/Gemma `` tags with attributes or self-closing syntax from delivered replies, including strict final-tag streaming enforcement. Fixes #65867. - macOS/update: disarm legacy `ai.openclaw.update.*` LaunchAgents when `openclaw update` starts from one, preventing KeepAlive relaunch loops that repeatedly restart the Gateway and replay update continuations. Fixes #82167. - Agents/replay: strip internal runtime-context metadata and `NO_REPLY` sentinels from provider replay and pending final-delivery recovery so restart and heartbeat resumes do not feed control text back to the model. Fixes #76629. Thanks @fuyizheng3120, @bryan-chx, and @cael-dandelion-cult. diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index c3d99b310fe..636fae94cef 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -71,6 +71,12 @@ type DrainPendingDeliveriesCall = { now: number, ) => { match: boolean; bypassBackoff: boolean }; }; +type WorkerPollSuccessListener = (message: { + type: "poll-success"; + offset: null; + count: number; + finishedAt: number; +}) => void; type AsyncVoidFn = () => Promise; type MockCallSource = { mock: { calls: Array> } }; @@ -882,6 +888,309 @@ describe("TelegramPollingSession", () => { } }); + it("keeps active spooled lanes blocked across isolated ingress restarts", async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + const abort = new AbortController(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + let releaseRegularTurn: (() => void) | undefined; + const regularTurnDone = new Promise((resolve) => { + releaseRegularTurn = resolve; + }); + const handleUpdate = vi.fn(async () => { + await regularTurnDone; + }); + createTelegramBotMock.mockImplementation(() => ({ + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + init: vi.fn(async () => undefined), + handleUpdate, + stop: vi.fn(async () => undefined), + })); + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: { + update_id: 42, + message: { text: "summarize this", chat: { id: -100, type: "supergroup" } }, + }, + }); + + let workerTaskCalls = 0; + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + const createWorker = vi.fn(() => ({ + onMessage: vi.fn(() => () => undefined), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + workerTaskCalls += 1; + if (workerTaskCalls === 1) { + return; + } + await workerDone; + }), + })); + + try { + const session = createPollingSession({ + abortSignal: abort.signal, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: 100, + }, + }); + + const runPromise = session.runUntilAbort(); + await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1)); + await vi.advanceTimersByTimeAsync(16_000); + await vi.waitFor(() => expect(createWorker).toHaveBeenCalledTimes(2)); + expect(handleUpdate).toHaveBeenCalledTimes(1); + + releaseRegularTurn?.(); + await vi.advanceTimersByTimeAsync(1_000); + await vi.waitFor(async () => + expect( + (await listTelegramSpooledUpdates({ spoolDir: tempDir })).map( + (update) => update.updateId, + ), + ).toEqual([]), + ); + abort.abort(); + await vi.advanceTimersByTimeAsync(20_000); + await runPromise; + } finally { + releaseRegularTurn?.(); + vi.useRealTimers(); + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("keeps active spooled lanes blocked across account restarts", async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + const firstAbort = new AbortController(); + const secondAbort = new AbortController(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + let releaseRegularTurn: (() => void) | undefined; + const regularTurnDone = new Promise((resolve) => { + releaseRegularTurn = resolve; + }); + const handleUpdate = vi.fn(async () => { + await regularTurnDone; + }); + createTelegramBotMock.mockImplementation(() => ({ + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + init: vi.fn(async () => undefined), + handleUpdate, + stop: vi.fn(async () => undefined), + })); + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: { + update_id: 42, + message: { text: "summarize this", chat: { id: -100, type: "supergroup" } }, + }, + }); + + const createWorker = vi.fn(() => { + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + return { + onMessage: vi.fn(() => () => undefined), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + await workerDone; + }), + }; + }); + + try { + const firstSession = createPollingSession({ + abortSignal: firstAbort.signal, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: 100, + }, + }); + + const firstRunPromise = firstSession.runUntilAbort(); + await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1)); + firstAbort.abort(); + await vi.advanceTimersByTimeAsync(16_000); + await firstRunPromise; + + const secondSession = createPollingSession({ + abortSignal: secondAbort.signal, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: 100, + }, + }); + const secondRunPromise = secondSession.runUntilAbort(); + await vi.waitFor(() => expect(createWorker).toHaveBeenCalledTimes(2)); + await vi.advanceTimersByTimeAsync(1_000); + expect(handleUpdate).toHaveBeenCalledTimes(1); + + releaseRegularTurn?.(); + await vi.advanceTimersByTimeAsync(1_000); + await vi.waitFor(async () => + expect( + (await listTelegramSpooledUpdates({ spoolDir: tempDir })).map( + (update) => update.updateId, + ), + ).toEqual([]), + ); + secondAbort.abort(); + await vi.advanceTimersByTimeAsync(20_000); + await secondRunPromise; + } finally { + releaseRegularTurn?.(); + vi.useRealTimers(); + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("marks isolated ingress unhealthy when a spooled backlog wedges while polling stays live", async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + const abort = new AbortController(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + const log = vi.fn(); + const setStatus = vi.fn(); + let releaseRegularTurn: (() => void) | undefined; + const regularTurnDone = new Promise((resolve) => { + releaseRegularTurn = resolve; + }); + const handleUpdate = vi.fn(async () => { + await regularTurnDone; + }); + createTelegramBotMock.mockImplementation(() => ({ + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + init: vi.fn(async () => undefined), + handleUpdate, + stop: vi.fn(async () => undefined), + })); + for (const updateId of [42, 43]) { + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: { + update_id: updateId, + message: { text: `dm ${updateId}`, chat: { id: 123, type: "private" } }, + }, + }); + } + + const workerListeners: WorkerPollSuccessListener[] = []; + const createWorker = vi.fn(() => { + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + return { + onMessage: vi.fn((listener: WorkerPollSuccessListener) => { + workerListeners.push(listener); + return () => undefined; + }), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + await workerDone; + }), + }; + }); + + try { + const session = createPollingSession({ + abortSignal: abort.signal, + log, + setStatus, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: 100, + }, + }); + + const runPromise = session.runUntilAbort(); + await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1)); + workerListeners[0]?.({ + type: "poll-success", + offset: null, + count: 0, + finishedAt: Date.now(), + }); + expect(statusPatches(setStatus).some((patch) => patch.connected === true)).toBe(true); + + await vi.advanceTimersByTimeAsync(25 * 60_000 + 100); + + await vi.waitFor(() => + expect(log).toHaveBeenCalledWith( + expect.stringContaining("isolated polling spool backlog stalled"), + ), + ); + expect( + statusPatches(setStatus).some( + (patch) => + patch.connected === false && + String(patch.lastError).includes("isolated polling spool backlog stalled"), + ), + ).toBe(true); + workerListeners[0]?.({ + type: "poll-success", + offset: null, + count: 0, + finishedAt: Date.now(), + }); + expect(statusPatches(setStatus).at(-1)?.connected).toBe(false); + + releaseRegularTurn?.(); + await vi.advanceTimersByTimeAsync(1_000); + await vi.waitFor(async () => + expect( + (await listTelegramSpooledUpdates({ spoolDir: tempDir })).map( + (update) => update.updateId, + ), + ).toEqual([]), + ); + workerListeners[0]?.({ + type: "poll-success", + offset: null, + count: 0, + finishedAt: Date.now(), + }); + await vi.waitFor(() => expect(statusPatches(setStatus).at(-1)?.connected).toBe(true)); + expect(createWorker).toHaveBeenCalledTimes(1); + + abort.abort(); + await vi.advanceTimersByTimeAsync(20_000); + await runPromise; + } finally { + releaseRegularTurn?.(); + vi.useRealTimers(); + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + it("forces a restart when polling stalls without getUpdates activity", async () => { const abort = new AbortController(); const botStop = vi.fn(async () => undefined); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 49ab1319be9..240ca97d035 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -41,6 +41,7 @@ const MIN_POLL_STALL_THRESHOLD_MS = 30_000; const MAX_POLL_STALL_THRESHOLD_MS = 600_000; const POLL_WATCHDOG_INTERVAL_MS = 30_000; const POLL_STOP_GRACE_MS = 15_000; +const ISOLATED_INGRESS_BACKLOG_STALL_MS = 25 * 60_000; const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil( TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS / 1000, ); @@ -109,13 +110,34 @@ type TelegramPollingSessionOpts = { }; }; +type SpooledUpdateHandlerState = { + handlerKey: string; + laneKey: string; + task: Promise; + updateId: number; + startedAt: number; +}; + +type SpooledUpdateDrainResult = { + blockedByLane: Set; + started: number; +}; + +// Account health restarts create a new session in the same process while an old +// spooled handler may still be running after shutdown grace. +const activeSpooledUpdateHandlersByLane = new Map(); + +function buildSpooledUpdateHandlerKey(params: { spoolDir: string; laneKey: string }): string { + return `${params.spoolDir}\0${params.laneKey}`; +} + export class TelegramPollingSession { #restartAttempts = 0; #webhookCleared = false; #forceRestarted = false; #activeRunner: ReturnType | undefined; #activeFetchAbort: AbortController | undefined; - #spooledUpdateHandlersByLane = new Map>(); + #spooledUpdateHandlerKeys = new Set(); #transportState: TelegramPollingTransportState; #status: ReturnType; #stallThresholdMs: number; @@ -315,11 +337,19 @@ export class TelegramPollingSession { } async #waitForSpooledUpdateHandlers(): Promise { - await Promise.allSettled(this.#spooledUpdateHandlersByLane.values()); + await Promise.allSettled( + [...this.#spooledUpdateHandlerKeys] + .map((handlerKey) => activeSpooledUpdateHandlersByLane.get(handlerKey)?.task) + .filter((task): task is Promise => Boolean(task)), + ); } - async #drainSpooledUpdates(params: { bot: TelegramBot; spoolDir: string }): Promise { + async #drainSpooledUpdates(params: { + bot: TelegramBot; + spoolDir: string; + }): Promise { const updates = await listTelegramSpooledUpdates({ spoolDir: params.spoolDir, limit: 100 }); + const blockedByLane = new Set(); let started = 0; for (const update of updates) { const laneKey = getTelegramSequentialKey({ @@ -329,20 +359,54 @@ export class TelegramPollingSession { if (this.opts.abortSignal?.aborted) { break; } - if (this.#spooledUpdateHandlersByLane.has(laneKey)) { + const handlerKey = buildSpooledUpdateHandlerKey({ spoolDir: params.spoolDir, laneKey }); + if (activeSpooledUpdateHandlersByLane.has(handlerKey)) { + blockedByLane.add(handlerKey); continue; } const handler = this.#handleSpooledUpdate({ bot: params.bot, update, }); - this.#spooledUpdateHandlersByLane.set(laneKey, handler); + const state: SpooledUpdateHandlerState = { + handlerKey, + laneKey, + task: handler, + updateId: update.updateId, + startedAt: Date.now(), + }; + activeSpooledUpdateHandlersByLane.set(handlerKey, state); + this.#spooledUpdateHandlerKeys.add(handlerKey); void handler.finally(() => { - this.#spooledUpdateHandlersByLane.delete(laneKey); + if (activeSpooledUpdateHandlersByLane.get(handlerKey) === state) { + activeSpooledUpdateHandlersByLane.delete(handlerKey); + } + this.#spooledUpdateHandlerKeys.delete(handlerKey); }); started += 1; } - return started; + return { blockedByLane, started }; + } + + #detectStaleSpooledHandler( + blockedHandlerKeys: Set, + ): (SpooledUpdateHandlerState & { ageMs: number }) | null { + const now = Date.now(); + let stale: (SpooledUpdateHandlerState & { ageMs: number }) | null = null; + for (const handlerKey of blockedHandlerKeys) { + const handler = activeSpooledUpdateHandlersByLane.get(handlerKey); + if (!handler) { + continue; + } + const ageMs = now - handler.startedAt; + if (ageMs <= ISOLATED_INGRESS_BACKLOG_STALL_MS) { + continue; + } + if (!stale || ageMs > stale.ageMs) { + stale = { ...handler, ageMs }; + } + } + return stale; } async #runIsolatedIngressCycle(bot: TelegramBot): Promise<"continue" | "exit"> { @@ -384,6 +448,7 @@ export class TelegramPollingSession { outcome: "not-started", }; let consecutiveDrainFailures = 0; + const stalledBacklogKeys = new Set(); const unsubscribe = worker.onMessage((message) => { if (message.type === "poll-start") { pollState.startedAt = message.startedAt; @@ -393,7 +458,9 @@ export class TelegramPollingSession { return; } if (message.type === "poll-success") { - this.#status.notePollSuccess(message.finishedAt); + if (stalledBacklogKeys.size === 0) { + this.#status.notePollSuccess(message.finishedAt); + } this.#drainPendingDeliveriesAfterReconnect(); pollState.outcome = `ok:${message.count}`; return; @@ -409,14 +476,38 @@ export class TelegramPollingSession { this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500)); let drainActive = false; + const stopBot = () => { + return Promise.resolve(bot.stop()) + .then(() => undefined) + .catch(() => { + // Bot may already be stopped by shutdown paths. + }); + }; const drainOnce = async () => { if (drainActive || this.opts.abortSignal?.aborted) { return; } drainActive = true; try { - await this.#drainSpooledUpdates({ bot, spoolDir }); + const drain = await this.#drainSpooledUpdates({ bot, spoolDir }); consecutiveDrainFailures = 0; + for (const handlerKey of [...stalledBacklogKeys]) { + if ( + !activeSpooledUpdateHandlersByLane.has(handlerKey) || + !drain.blockedByLane.has(handlerKey) + ) { + stalledBacklogKeys.delete(handlerKey); + } + } + const staleHandler = this.#detectStaleSpooledHandler(drain.blockedByLane); + if (staleHandler) { + if (!stalledBacklogKeys.has(staleHandler.handlerKey)) { + stalledBacklogKeys.add(staleHandler.handlerKey); + const message = `Telegram isolated polling spool backlog stalled behind update ${staleHandler.updateId} on lane ${staleHandler.laneKey} for ${formatDurationPrecise(staleHandler.ageMs)}; marking polling unhealthy until the backlog drains.`; + this.opts.log(`[telegram] ${message}`); + this.#status.notePollingError(message); + } + } } catch (err) { consecutiveDrainFailures += 1; this.opts.log( @@ -431,13 +522,6 @@ export class TelegramPollingSession { void drainOnce(); }, drainIntervalMs); drainTimer.unref?.(); - const stopBot = () => { - return Promise.resolve(bot.stop()) - .then(() => undefined) - .catch(() => { - // Bot may already be stopped by shutdown paths. - }); - }; try { await worker.task(); if (this.opts.abortSignal?.aborted) { diff --git a/extensions/telegram/src/polling-status.ts b/extensions/telegram/src/polling-status.ts index b9f456306ee..6d43b510c84 100644 --- a/extensions/telegram/src/polling-status.ts +++ b/extensions/telegram/src/polling-status.ts @@ -27,6 +27,13 @@ export function createTelegramPollingStatusPublisher(setStatus?: TelegramPolling lastError: null, }); }, + notePollingError(error: string) { + setStatus?.({ + mode: "polling", + connected: false, + lastError: error, + }); + }, notePollingStop() { setStatus?.({ mode: "polling", diff --git a/extensions/telegram/src/status-issues.ts b/extensions/telegram/src/status-issues.ts index 4d9b41f575e..ab78f87950e 100644 --- a/extensions/telegram/src/status-issues.ts +++ b/extensions/telegram/src/status-issues.ts @@ -69,6 +69,10 @@ function appendTelegramRuntimeError(message: string, lastError: unknown): string return error ? `${message}: ${error}` : message; } +function isTelegramPollingBacklogStallError(lastError: unknown): boolean { + return Boolean(asString(lastError)?.includes("isolated polling spool backlog stalled")); +} + function collectTelegramPollingRuntimeIssues(params: { account: TelegramAccountStatus; accountId: string; @@ -88,14 +92,14 @@ function collectTelegramPollingRuntimeIssues(params: { const withinStartupGrace = lastStartAt != null && now - lastStartAt < TELEGRAM_POLLING_CONNECT_GRACE_MS; if (!withinStartupGrace) { + const message = isTelegramPollingBacklogStallError(account.lastError) + ? "Telegram isolated polling spool backlog is stalled while Bot API polling is still succeeding" + : "Telegram polling is running but has not completed a successful getUpdates call since startup"; issues.push({ channel: "telegram", accountId, kind: "runtime", - message: appendTelegramRuntimeError( - "Telegram polling is running but has not completed a successful getUpdates call since startup", - account.lastError, - ), + message: appendTelegramRuntimeError(message, account.lastError), fix, }); } diff --git a/extensions/telegram/src/status.test.ts b/extensions/telegram/src/status.test.ts index 120406ce065..be0cd687660 100644 --- a/extensions/telegram/src/status.test.ts +++ b/extensions/telegram/src/status.test.ts @@ -119,6 +119,31 @@ describe("collectTelegramStatusIssues", () => { expect(issues[0]?.fix).toContain("channels status --probe"); }); + it("reports isolated polling spool backlog stalls distinctly from startup failures", () => { + const issues = collectTelegramStatusIssues([ + { + accountId: "main", + enabled: true, + configured: true, + running: true, + mode: "polling", + connected: false, + lastStartAt: Date.now() - 121_000, + lastError: + "Telegram isolated polling spool backlog stalled behind update 42 on lane telegram:123 for 1500100ms; marking polling unhealthy until the backlog drains.", + } as ChannelAccountSnapshot, + ]); + + expect(issues).toHaveLength(1); + expectIssueFields(issues[0], { + channel: "telegram", + accountId: "main", + kind: "runtime", + }); + expect(issues[0]?.message).toContain("spool backlog is stalled"); + expect(issues[0]?.message).not.toContain("has not completed a successful getUpdates call"); + }); + it("does not report polling startup before the connect grace expires", () => { const issues = collectTelegramStatusIssues([ {