From 7c79f0ac9cda57b0cd7d63f6f8d92a4400790728 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 02:08:02 +0100 Subject: [PATCH] fix(telegram): centralize update offset tracking --- CHANGELOG.md | 1 + extensions/telegram/src/bot-core.ts | 113 ++------- .../telegram/src/bot-update-tracker.test.ts | 176 +++++++++++++ extensions/telegram/src/bot-update-tracker.ts | 233 ++++++++++++++++++ 4 files changed, 429 insertions(+), 94 deletions(-) create mode 100644 extensions/telegram/src/bot-update-tracker.test.ts create mode 100644 extensions/telegram/src/bot-update-tracker.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index bf3e12b0512..a702d24b083 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai - CLI/status: show skipped fast-path memory checks as `not checked` and report active custom memory plugin runtime status from `status --json --all` without requiring built-in `agents.defaults.memorySearch`, so plugins such as memory-lancedb-pro and memory-cms no longer look unavailable when their own runtime is healthy. Fixes #56968. Thanks @Tony-ooo and @aderius. - Gateway/channels: record and log unexpected clean channel monitor exits so channels that return without throwing no longer appear stopped with no error. Fixes #73099. Thanks @balaji1968-kingler. +- Channels/Telegram: centralize polling update tracking so accepted offsets remain durable across restarts, same-process handler failures can still retry, and slow offset writes cannot overwrite newer accepted watermarks. Refs #73115. Thanks @vdruts. - Memory/LanceDB: let embedding config use provider-backed auth profiles, environment credentials, or provider config without a separate plugin `embedding.apiKey`, so OAuth-capable embedding providers can power auto-recall/capture. Fixes #68950. Thanks @malshaalan-ai. - Plugins/hooks: time out never-settling `agent_end` observation hooks after 30 seconds and log the plugin failure, so hung embedding endpoints no longer leave memory capture silently pending forever. Fixes #65544. Thanks @ghoc0099. - Gateway/config: serve runtime config schemas from the current plugin metadata snapshot and generated bundled channel schema metadata instead of rebuilding plugin channel config modules on every `config.get`/`config.schema`, preventing idle plugin-discovery CPU churn after upgrades. Fixes #73088. Thanks @sleitor and @geovansb. diff --git a/extensions/telegram/src/bot-core.ts b/extensions/telegram/src/bot-core.ts index 0b32eeee1f4..f841221f6ec 100644 --- a/extensions/telegram/src/bot-core.ts +++ b/extensions/telegram/src/bot-core.ts @@ -28,12 +28,8 @@ import type { TelegramBotDeps } from "./bot-deps.js"; import { registerTelegramHandlers } from "./bot-handlers.runtime.js"; import { createTelegramMessageProcessor } from "./bot-message.js"; import { registerTelegramNativeCommands } from "./bot-native-commands.js"; -import { - buildTelegramUpdateKey, - createTelegramUpdateDedupe, - resolveTelegramUpdateId, - type TelegramUpdateKeyContext, -} from "./bot-updates.js"; +import { createTelegramUpdateTracker } from "./bot-update-tracker.js"; +import type { TelegramUpdateKeyContext } from "./bot-updates.js"; import { resolveDefaultAgentId } from "./bot.agent.runtime.js"; import { apiThrottler, Bot, sequentialize, type ApiClientOptions } from "./bot.runtime.js"; import type { TelegramBotOptions } from "./bot.types.js"; @@ -278,108 +274,37 @@ export function createTelegramBotCore( runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`)); }); - const recentUpdates = createTelegramUpdateDedupe(); - const pendingUpdateKeys = new Set(); - const activeHandledUpdateKeys = new Map(); const initialUpdateId = typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null; - const failedUpdateIds = new Set(); - let highestAcceptedUpdateId: number | null = initialUpdateId; - let highestPersistedUpdateId: number | null = initialUpdateId; - - const persistAcceptedUpdateId = (updateId: number) => { - if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) { - return; - } - highestAcceptedUpdateId = updateId; - if (typeof opts.updateOffset?.onUpdateId !== "function") { - return; - } - if (highestPersistedUpdateId !== null && updateId <= highestPersistedUpdateId) { - return; - } - highestPersistedUpdateId = updateId; - void Promise.resolve() - .then(() => opts.updateOffset?.onUpdateId?.(updateId)) - .catch((err) => { - runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`); - }); - }; - const logSkippedUpdate = (key: string) => { if (shouldLogVerbose()) { logVerbose(`telegram dedupe: skipped ${key}`); } }; - - const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { - const updateId = resolveTelegramUpdateId(ctx); - if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) { - return true; - } - const key = buildTelegramUpdateKey(ctx); - if (!key) { - return false; - } - const handled = activeHandledUpdateKeys.get(key); - if (handled != null) { - if (handled) { - logSkippedUpdate(key); - return true; - } - activeHandledUpdateKeys.set(key, true); - return false; - } - const skipped = recentUpdates.check(key); - if (skipped) { - logSkippedUpdate(key); - } - return skipped; - }; + const updateTracker = createTelegramUpdateTracker({ + initialUpdateId, + ...(typeof opts.updateOffset?.onUpdateId === "function" + ? { onAcceptedUpdateId: opts.updateOffset.onUpdateId } + : {}), + onPersistError: (err) => { + runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`); + }, + onSkip: logSkippedUpdate, + }); + const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => + updateTracker.shouldSkipHandlerDispatch(ctx); bot.use(async (ctx, next) => { - const updateId = resolveTelegramUpdateId(ctx); - const updateKey = buildTelegramUpdateKey(ctx); + const begin = updateTracker.beginUpdate(ctx); + if (!begin.accepted) { + return; + } let completed = false; - if (typeof updateId === "number") { - if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) { - if (!failedUpdateIds.has(updateId)) { - logSkippedUpdate(`update:${updateId}`); - return; - } - } else { - failedUpdateIds.delete(updateId); - } - } - if (updateKey) { - if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) { - logSkippedUpdate(updateKey); - return; - } - pendingUpdateKeys.add(updateKey); - activeHandledUpdateKeys.set(updateKey, false); - } - if (typeof updateId === "number") { - persistAcceptedUpdateId(updateId); - } try { await next(); completed = true; } finally { - if (updateKey) { - activeHandledUpdateKeys.delete(updateKey); - if (completed) { - recentUpdates.check(updateKey); - } - pendingUpdateKeys.delete(updateKey); - } - if (typeof updateId === "number") { - if (completed) { - failedUpdateIds.delete(updateId); - } else { - failedUpdateIds.add(updateId); - } - } + updateTracker.finishUpdate(begin.update, { completed }); } }); diff --git a/extensions/telegram/src/bot-update-tracker.test.ts b/extensions/telegram/src/bot-update-tracker.test.ts new file mode 100644 index 00000000000..311b8a2c0df --- /dev/null +++ b/extensions/telegram/src/bot-update-tracker.test.ts @@ -0,0 +1,176 @@ +import { describe, expect, it, vi } from "vitest"; +import { + createTelegramUpdateTracker, + type TelegramUpdateTrackerState, +} from "./bot-update-tracker.js"; +import type { TelegramUpdateKeyContext } from "./bot-updates.js"; + +const updateCtx = (updateId: number): TelegramUpdateKeyContext => ({ + update: { update_id: updateId }, +}); + +async function flushTrackerMicrotasks() { + await Promise.resolve(); + await Promise.resolve(); +} + +function deferred() { + let resolve!: () => void; + const promise = new Promise((resolvePromise) => { + resolve = resolvePromise; + }); + return { promise, resolve }; +} + +describe("createTelegramUpdateTracker", () => { + it("persists accepted offsets before earlier pending updates complete", async () => { + const onAcceptedUpdateId = vi.fn(); + const tracker = createTelegramUpdateTracker({ + initialUpdateId: 100, + onAcceptedUpdateId, + }); + + const update101 = tracker.beginUpdate(updateCtx(101)); + if (!update101.accepted) { + throw new Error("expected update 101 to be accepted"); + } + await flushTrackerMicrotasks(); + expect(onAcceptedUpdateId).toHaveBeenCalledWith(101); + + const update102 = tracker.beginUpdate(updateCtx(102)); + if (!update102.accepted) { + throw new Error("expected update 102 to be accepted"); + } + tracker.finishUpdate(update102.update, { completed: true }); + await flushTrackerMicrotasks(); + + expect(onAcceptedUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([101, 102]); + expect(tracker.getState()).toMatchObject({ + highestAcceptedUpdateId: 102, + highestPersistedAcceptedUpdateId: 102, + highestCompletedUpdateId: 102, + safeCompletedUpdateId: 100, + pendingUpdateIds: [101], + failedUpdateIds: [], + } satisfies Partial); + + tracker.finishUpdate(update101.update, { completed: true }); + expect(tracker.getState()).toMatchObject({ + highestCompletedUpdateId: 102, + safeCompletedUpdateId: 102, + pendingUpdateIds: [], + } satisfies Partial); + }); + + it("skips restart replays once the accepted offset is restored", async () => { + const onAcceptedUpdateId = vi.fn(); + const firstProcess = createTelegramUpdateTracker({ + initialUpdateId: 100, + onAcceptedUpdateId, + }); + + const accepted = firstProcess.beginUpdate(updateCtx(101)); + expect(accepted.accepted).toBe(true); + await flushTrackerMicrotasks(); + + const restartedProcess = createTelegramUpdateTracker({ + initialUpdateId: Number(onAcceptedUpdateId.mock.calls.at(-1)?.[0]), + }); + + expect(restartedProcess.beginUpdate(updateCtx(101))).toEqual({ + accepted: false, + reason: "accepted-watermark", + }); + }); + + it("serializes and coalesces accepted offset persistence", async () => { + const firstWrite = deferred(); + const secondWrite = deferred(); + const writes: number[] = []; + const onAcceptedUpdateId = vi.fn((updateId: number) => { + writes.push(updateId); + if (updateId === 101) { + return firstWrite.promise; + } + return secondWrite.promise; + }); + const tracker = createTelegramUpdateTracker({ + initialUpdateId: 100, + onAcceptedUpdateId, + }); + + const update101 = tracker.beginUpdate(updateCtx(101)); + const update102 = tracker.beginUpdate(updateCtx(102)); + const update103 = tracker.beginUpdate(updateCtx(103)); + expect(update101.accepted).toBe(true); + expect(update102.accepted).toBe(true); + expect(update103.accepted).toBe(true); + + await flushTrackerMicrotasks(); + expect(writes).toEqual([101]); + expect(tracker.getState()).toMatchObject({ + highestAcceptedUpdateId: 103, + highestPersistedAcceptedUpdateId: 100, + } satisfies Partial); + + firstWrite.resolve(); + await flushTrackerMicrotasks(); + expect(writes).toEqual([101, 103]); + expect(onAcceptedUpdateId).not.toHaveBeenCalledWith(102); + + secondWrite.resolve(); + await flushTrackerMicrotasks(); + expect(tracker.getState()).toMatchObject({ + highestPersistedAcceptedUpdateId: 103, + } satisfies Partial); + }); + + it("keeps failed accepted updates retryable in the same process", () => { + const tracker = createTelegramUpdateTracker({ initialUpdateId: 200 }); + const first = tracker.beginUpdate(updateCtx(201)); + if (!first.accepted) { + throw new Error("expected first update to be accepted"); + } + tracker.finishUpdate(first.update, { completed: false }); + + expect(tracker.getState()).toMatchObject({ + highestAcceptedUpdateId: 201, + highestCompletedUpdateId: 200, + safeCompletedUpdateId: 200, + failedUpdateIds: [201], + } satisfies Partial); + + const retry = tracker.beginUpdate(updateCtx(201)); + if (!retry.accepted) { + throw new Error("expected failed update retry to be accepted"); + } + tracker.finishUpdate(retry.update, { completed: true }); + + expect(tracker.getState()).toMatchObject({ + highestAcceptedUpdateId: 201, + highestCompletedUpdateId: 201, + safeCompletedUpdateId: 201, + failedUpdateIds: [], + } satisfies Partial); + expect(tracker.beginUpdate(updateCtx(201))).toEqual({ + accepted: false, + reason: "accepted-watermark", + }); + }); + + it("dedupes handler dispatch separately from the accepted watermark", () => { + const onSkip = vi.fn(); + const tracker = createTelegramUpdateTracker({ initialUpdateId: 300, onSkip }); + const accepted = tracker.beginUpdate(updateCtx(301)); + if (!accepted.accepted) { + throw new Error("expected update to be accepted"); + } + + expect(tracker.shouldSkipHandlerDispatch(updateCtx(301))).toBe(false); + expect(tracker.shouldSkipHandlerDispatch(updateCtx(301))).toBe(true); + expect(onSkip).toHaveBeenCalledWith("update:301"); + + tracker.finishUpdate(accepted.update, { completed: true }); + expect(tracker.shouldSkipHandlerDispatch(updateCtx(301))).toBe(true); + }); +}); diff --git a/extensions/telegram/src/bot-update-tracker.ts b/extensions/telegram/src/bot-update-tracker.ts new file mode 100644 index 00000000000..e21f5662829 --- /dev/null +++ b/extensions/telegram/src/bot-update-tracker.ts @@ -0,0 +1,233 @@ +import { + buildTelegramUpdateKey, + createTelegramUpdateDedupe, + resolveTelegramUpdateId, + type TelegramUpdateKeyContext, +} from "./bot-updates.js"; + +type PersistUpdateId = (updateId: number) => void | Promise; + +type TelegramUpdateTrackerOptions = { + initialUpdateId?: number | null; + onAcceptedUpdateId?: PersistUpdateId; + onPersistError?: (error: unknown) => void; + onSkip?: (key: string) => void; +}; + +type AcceptedTelegramUpdate = { + key?: string; + updateId?: number; +}; + +type BeginUpdateResult = + | { + accepted: true; + update: AcceptedTelegramUpdate; + } + | { + accepted: false; + reason: "accepted-watermark" | "semantic-dedupe"; + }; + +type FinishUpdateOptions = { + completed: boolean; +}; + +export type TelegramUpdateTrackerState = { + highestAcceptedUpdateId: number | null; + highestPersistedAcceptedUpdateId: number | null; + highestCompletedUpdateId: number | null; + safeCompletedUpdateId: number | null; + pendingUpdateIds: number[]; + failedUpdateIds: number[]; +}; + +function sortedIds(ids: Set): number[] { + return [...ids].toSorted((a, b) => a - b); +} + +export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOptions = {}) { + const initialUpdateId = + typeof options.initialUpdateId === "number" ? options.initialUpdateId : null; + const recentUpdates = createTelegramUpdateDedupe(); + const pendingUpdateKeys = new Set(); + const activeHandledUpdateKeys = new Map(); + const pendingUpdateIds = new Set(); + const failedUpdateIds = new Set(); + let highestAcceptedUpdateId: number | null = initialUpdateId; + let highestPersistedAcceptedUpdateId: number | null = initialUpdateId; + let highestPersistenceRequestedUpdateId: number | null = initialUpdateId; + let highestCompletedUpdateId: number | null = initialUpdateId; + let persistInFlight = false; + let persistTargetUpdateId: number | null = null; + + const skip = (key: string) => { + options.onSkip?.(key); + }; + + const drainPersistQueue = async () => { + const persist = options.onAcceptedUpdateId; + if (persistInFlight || typeof persist !== "function") { + return; + } + persistInFlight = true; + try { + while (persistTargetUpdateId !== null) { + const updateId = persistTargetUpdateId; + persistTargetUpdateId = null; + try { + await persist(updateId); + if ( + highestPersistedAcceptedUpdateId === null || + updateId > highestPersistedAcceptedUpdateId + ) { + highestPersistedAcceptedUpdateId = updateId; + } + } catch (err) { + options.onPersistError?.(err); + } + } + } finally { + persistInFlight = false; + } + }; + + const requestPersistAcceptedUpdateId = (updateId: number) => { + if (typeof options.onAcceptedUpdateId !== "function") { + return; + } + if ( + highestPersistenceRequestedUpdateId !== null && + updateId <= highestPersistenceRequestedUpdateId + ) { + return; + } + highestPersistenceRequestedUpdateId = updateId; + persistTargetUpdateId = updateId; + void drainPersistQueue().catch((err) => { + options.onPersistError?.(err); + }); + }; + + const acceptUpdateId = (updateId: number) => { + if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) { + return; + } + highestAcceptedUpdateId = updateId; + requestPersistAcceptedUpdateId(updateId); + }; + + const beginUpdate = (ctx: TelegramUpdateKeyContext): BeginUpdateResult => { + const updateId = resolveTelegramUpdateId(ctx); + const updateKey = buildTelegramUpdateKey(ctx); + if (typeof updateId === "number") { + if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) { + if (!failedUpdateIds.has(updateId)) { + skip(`update:${updateId}`); + return { accepted: false, reason: "accepted-watermark" }; + } + } else { + failedUpdateIds.delete(updateId); + } + } + if (updateKey) { + if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) { + skip(updateKey); + return { accepted: false, reason: "semantic-dedupe" }; + } + pendingUpdateKeys.add(updateKey); + activeHandledUpdateKeys.set(updateKey, false); + } + if (typeof updateId === "number") { + pendingUpdateIds.add(updateId); + acceptUpdateId(updateId); + } + return { + accepted: true, + update: { + ...(updateKey ? { key: updateKey } : {}), + ...(typeof updateId === "number" ? { updateId } : {}), + }, + }; + }; + + const finishUpdate = (update: AcceptedTelegramUpdate, finish: FinishUpdateOptions) => { + if (update.key) { + activeHandledUpdateKeys.delete(update.key); + if (finish.completed) { + recentUpdates.check(update.key); + } + pendingUpdateKeys.delete(update.key); + } + if (typeof update.updateId === "number") { + pendingUpdateIds.delete(update.updateId); + if (finish.completed) { + failedUpdateIds.delete(update.updateId); + if (highestCompletedUpdateId === null || update.updateId > highestCompletedUpdateId) { + highestCompletedUpdateId = update.updateId; + } + } else { + failedUpdateIds.add(update.updateId); + } + } + }; + + const shouldSkipHandlerDispatch = (ctx: TelegramUpdateKeyContext) => { + const updateId = resolveTelegramUpdateId(ctx); + if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) { + return true; + } + const key = buildTelegramUpdateKey(ctx); + if (!key) { + return false; + } + const handled = activeHandledUpdateKeys.get(key); + if (handled != null) { + if (handled) { + skip(key); + return true; + } + activeHandledUpdateKeys.set(key, true); + return false; + } + const skipped = recentUpdates.check(key); + if (skipped) { + skip(key); + } + return skipped; + }; + + const resolveSafeCompletedUpdateId = () => { + if (highestCompletedUpdateId === null) { + return null; + } + let safeCompletedUpdateId = highestCompletedUpdateId; + for (const updateId of pendingUpdateIds) { + if (updateId <= safeCompletedUpdateId) { + safeCompletedUpdateId = updateId - 1; + } + } + for (const updateId of failedUpdateIds) { + if (updateId <= safeCompletedUpdateId) { + safeCompletedUpdateId = updateId - 1; + } + } + return safeCompletedUpdateId; + }; + + const getState = (): TelegramUpdateTrackerState => ({ + highestAcceptedUpdateId, + highestPersistedAcceptedUpdateId, + highestCompletedUpdateId, + safeCompletedUpdateId: resolveSafeCompletedUpdateId(), + pendingUpdateIds: sortedIds(pendingUpdateIds), + failedUpdateIds: sortedIds(failedUpdateIds), + }); + + return { + beginUpdate, + finishUpdate, + getState, + shouldSkipHandlerDispatch, + }; +}