fix(telegram): centralize update offset tracking

This commit is contained in:
Peter Steinberger
2026-04-28 02:08:02 +01:00
parent 955f0a692a
commit 7c79f0ac9c
4 changed files with 429 additions and 94 deletions

View File

@@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai
- CLI/status: show skipped fast-path memory checks as `not checked` and report active custom memory plugin runtime status from `status --json --all` without requiring built-in `agents.defaults.memorySearch`, so plugins such as memory-lancedb-pro and memory-cms no longer look unavailable when their own runtime is healthy. Fixes #56968. Thanks @Tony-ooo and @aderius.
- Gateway/channels: record and log unexpected clean channel monitor exits so channels that return without throwing no longer appear stopped with no error. Fixes #73099. Thanks @balaji1968-kingler.
- Channels/Telegram: centralize polling update tracking so accepted offsets remain durable across restarts, same-process handler failures can still retry, and slow offset writes cannot overwrite newer accepted watermarks. Refs #73115. Thanks @vdruts.
- Memory/LanceDB: let embedding config use provider-backed auth profiles, environment credentials, or provider config without a separate plugin `embedding.apiKey`, so OAuth-capable embedding providers can power auto-recall/capture. Fixes #68950. Thanks @malshaalan-ai.
- Plugins/hooks: time out never-settling `agent_end` observation hooks after 30 seconds and log the plugin failure, so hung embedding endpoints no longer leave memory capture silently pending forever. Fixes #65544. Thanks @ghoc0099.
- Gateway/config: serve runtime config schemas from the current plugin metadata snapshot and generated bundled channel schema metadata instead of rebuilding plugin channel config modules on every `config.get`/`config.schema`, preventing idle plugin-discovery CPU churn after upgrades. Fixes #73088. Thanks @sleitor and @geovansb.

View File

@@ -28,12 +28,8 @@ import type { TelegramBotDeps } from "./bot-deps.js";
import { registerTelegramHandlers } from "./bot-handlers.runtime.js";
import { createTelegramMessageProcessor } from "./bot-message.js";
import { registerTelegramNativeCommands } from "./bot-native-commands.js";
import {
buildTelegramUpdateKey,
createTelegramUpdateDedupe,
resolveTelegramUpdateId,
type TelegramUpdateKeyContext,
} from "./bot-updates.js";
import { createTelegramUpdateTracker } from "./bot-update-tracker.js";
import type { TelegramUpdateKeyContext } from "./bot-updates.js";
import { resolveDefaultAgentId } from "./bot.agent.runtime.js";
import { apiThrottler, Bot, sequentialize, type ApiClientOptions } from "./bot.runtime.js";
import type { TelegramBotOptions } from "./bot.types.js";
@@ -278,108 +274,37 @@ export function createTelegramBotCore(
runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`));
});
const recentUpdates = createTelegramUpdateDedupe();
const pendingUpdateKeys = new Set<string>();
const activeHandledUpdateKeys = new Map<string, boolean>();
const initialUpdateId =
typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null;
const failedUpdateIds = new Set<number>();
let highestAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistedUpdateId: number | null = initialUpdateId;
const persistAcceptedUpdateId = (updateId: number) => {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
return;
}
highestAcceptedUpdateId = updateId;
if (typeof opts.updateOffset?.onUpdateId !== "function") {
return;
}
if (highestPersistedUpdateId !== null && updateId <= highestPersistedUpdateId) {
return;
}
highestPersistedUpdateId = updateId;
void Promise.resolve()
.then(() => opts.updateOffset?.onUpdateId?.(updateId))
.catch((err) => {
runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`);
});
};
const logSkippedUpdate = (key: string) => {
if (shouldLogVerbose()) {
logVerbose(`telegram dedupe: skipped ${key}`);
}
};
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
const updateId = resolveTelegramUpdateId(ctx);
if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) {
return true;
}
const key = buildTelegramUpdateKey(ctx);
if (!key) {
return false;
}
const handled = activeHandledUpdateKeys.get(key);
if (handled != null) {
if (handled) {
logSkippedUpdate(key);
return true;
}
activeHandledUpdateKeys.set(key, true);
return false;
}
const skipped = recentUpdates.check(key);
if (skipped) {
logSkippedUpdate(key);
}
return skipped;
};
const updateTracker = createTelegramUpdateTracker({
initialUpdateId,
...(typeof opts.updateOffset?.onUpdateId === "function"
? { onAcceptedUpdateId: opts.updateOffset.onUpdateId }
: {}),
onPersistError: (err) => {
runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`);
},
onSkip: logSkippedUpdate,
});
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) =>
updateTracker.shouldSkipHandlerDispatch(ctx);
bot.use(async (ctx, next) => {
const updateId = resolveTelegramUpdateId(ctx);
const updateKey = buildTelegramUpdateKey(ctx);
const begin = updateTracker.beginUpdate(ctx);
if (!begin.accepted) {
return;
}
let completed = false;
if (typeof updateId === "number") {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
if (!failedUpdateIds.has(updateId)) {
logSkippedUpdate(`update:${updateId}`);
return;
}
} else {
failedUpdateIds.delete(updateId);
}
}
if (updateKey) {
if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) {
logSkippedUpdate(updateKey);
return;
}
pendingUpdateKeys.add(updateKey);
activeHandledUpdateKeys.set(updateKey, false);
}
if (typeof updateId === "number") {
persistAcceptedUpdateId(updateId);
}
try {
await next();
completed = true;
} finally {
if (updateKey) {
activeHandledUpdateKeys.delete(updateKey);
if (completed) {
recentUpdates.check(updateKey);
}
pendingUpdateKeys.delete(updateKey);
}
if (typeof updateId === "number") {
if (completed) {
failedUpdateIds.delete(updateId);
} else {
failedUpdateIds.add(updateId);
}
}
updateTracker.finishUpdate(begin.update, { completed });
}
});

View File

@@ -0,0 +1,176 @@
import { describe, expect, it, vi } from "vitest";
import {
createTelegramUpdateTracker,
type TelegramUpdateTrackerState,
} from "./bot-update-tracker.js";
import type { TelegramUpdateKeyContext } from "./bot-updates.js";
const updateCtx = (updateId: number): TelegramUpdateKeyContext => ({
update: { update_id: updateId },
});
async function flushTrackerMicrotasks() {
await Promise.resolve();
await Promise.resolve();
}
function deferred() {
let resolve!: () => void;
const promise = new Promise<void>((resolvePromise) => {
resolve = resolvePromise;
});
return { promise, resolve };
}
describe("createTelegramUpdateTracker", () => {
it("persists accepted offsets before earlier pending updates complete", async () => {
const onAcceptedUpdateId = vi.fn();
const tracker = createTelegramUpdateTracker({
initialUpdateId: 100,
onAcceptedUpdateId,
});
const update101 = tracker.beginUpdate(updateCtx(101));
if (!update101.accepted) {
throw new Error("expected update 101 to be accepted");
}
await flushTrackerMicrotasks();
expect(onAcceptedUpdateId).toHaveBeenCalledWith(101);
const update102 = tracker.beginUpdate(updateCtx(102));
if (!update102.accepted) {
throw new Error("expected update 102 to be accepted");
}
tracker.finishUpdate(update102.update, { completed: true });
await flushTrackerMicrotasks();
expect(onAcceptedUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([101, 102]);
expect(tracker.getState()).toMatchObject({
highestAcceptedUpdateId: 102,
highestPersistedAcceptedUpdateId: 102,
highestCompletedUpdateId: 102,
safeCompletedUpdateId: 100,
pendingUpdateIds: [101],
failedUpdateIds: [],
} satisfies Partial<TelegramUpdateTrackerState>);
tracker.finishUpdate(update101.update, { completed: true });
expect(tracker.getState()).toMatchObject({
highestCompletedUpdateId: 102,
safeCompletedUpdateId: 102,
pendingUpdateIds: [],
} satisfies Partial<TelegramUpdateTrackerState>);
});
it("skips restart replays once the accepted offset is restored", async () => {
const onAcceptedUpdateId = vi.fn();
const firstProcess = createTelegramUpdateTracker({
initialUpdateId: 100,
onAcceptedUpdateId,
});
const accepted = firstProcess.beginUpdate(updateCtx(101));
expect(accepted.accepted).toBe(true);
await flushTrackerMicrotasks();
const restartedProcess = createTelegramUpdateTracker({
initialUpdateId: Number(onAcceptedUpdateId.mock.calls.at(-1)?.[0]),
});
expect(restartedProcess.beginUpdate(updateCtx(101))).toEqual({
accepted: false,
reason: "accepted-watermark",
});
});
it("serializes and coalesces accepted offset persistence", async () => {
const firstWrite = deferred();
const secondWrite = deferred();
const writes: number[] = [];
const onAcceptedUpdateId = vi.fn((updateId: number) => {
writes.push(updateId);
if (updateId === 101) {
return firstWrite.promise;
}
return secondWrite.promise;
});
const tracker = createTelegramUpdateTracker({
initialUpdateId: 100,
onAcceptedUpdateId,
});
const update101 = tracker.beginUpdate(updateCtx(101));
const update102 = tracker.beginUpdate(updateCtx(102));
const update103 = tracker.beginUpdate(updateCtx(103));
expect(update101.accepted).toBe(true);
expect(update102.accepted).toBe(true);
expect(update103.accepted).toBe(true);
await flushTrackerMicrotasks();
expect(writes).toEqual([101]);
expect(tracker.getState()).toMatchObject({
highestAcceptedUpdateId: 103,
highestPersistedAcceptedUpdateId: 100,
} satisfies Partial<TelegramUpdateTrackerState>);
firstWrite.resolve();
await flushTrackerMicrotasks();
expect(writes).toEqual([101, 103]);
expect(onAcceptedUpdateId).not.toHaveBeenCalledWith(102);
secondWrite.resolve();
await flushTrackerMicrotasks();
expect(tracker.getState()).toMatchObject({
highestPersistedAcceptedUpdateId: 103,
} satisfies Partial<TelegramUpdateTrackerState>);
});
it("keeps failed accepted updates retryable in the same process", () => {
const tracker = createTelegramUpdateTracker({ initialUpdateId: 200 });
const first = tracker.beginUpdate(updateCtx(201));
if (!first.accepted) {
throw new Error("expected first update to be accepted");
}
tracker.finishUpdate(first.update, { completed: false });
expect(tracker.getState()).toMatchObject({
highestAcceptedUpdateId: 201,
highestCompletedUpdateId: 200,
safeCompletedUpdateId: 200,
failedUpdateIds: [201],
} satisfies Partial<TelegramUpdateTrackerState>);
const retry = tracker.beginUpdate(updateCtx(201));
if (!retry.accepted) {
throw new Error("expected failed update retry to be accepted");
}
tracker.finishUpdate(retry.update, { completed: true });
expect(tracker.getState()).toMatchObject({
highestAcceptedUpdateId: 201,
highestCompletedUpdateId: 201,
safeCompletedUpdateId: 201,
failedUpdateIds: [],
} satisfies Partial<TelegramUpdateTrackerState>);
expect(tracker.beginUpdate(updateCtx(201))).toEqual({
accepted: false,
reason: "accepted-watermark",
});
});
it("dedupes handler dispatch separately from the accepted watermark", () => {
const onSkip = vi.fn();
const tracker = createTelegramUpdateTracker({ initialUpdateId: 300, onSkip });
const accepted = tracker.beginUpdate(updateCtx(301));
if (!accepted.accepted) {
throw new Error("expected update to be accepted");
}
expect(tracker.shouldSkipHandlerDispatch(updateCtx(301))).toBe(false);
expect(tracker.shouldSkipHandlerDispatch(updateCtx(301))).toBe(true);
expect(onSkip).toHaveBeenCalledWith("update:301");
tracker.finishUpdate(accepted.update, { completed: true });
expect(tracker.shouldSkipHandlerDispatch(updateCtx(301))).toBe(true);
});
});

View File

@@ -0,0 +1,233 @@
import {
buildTelegramUpdateKey,
createTelegramUpdateDedupe,
resolveTelegramUpdateId,
type TelegramUpdateKeyContext,
} from "./bot-updates.js";
type PersistUpdateId = (updateId: number) => void | Promise<void>;
type TelegramUpdateTrackerOptions = {
initialUpdateId?: number | null;
onAcceptedUpdateId?: PersistUpdateId;
onPersistError?: (error: unknown) => void;
onSkip?: (key: string) => void;
};
type AcceptedTelegramUpdate = {
key?: string;
updateId?: number;
};
type BeginUpdateResult =
| {
accepted: true;
update: AcceptedTelegramUpdate;
}
| {
accepted: false;
reason: "accepted-watermark" | "semantic-dedupe";
};
type FinishUpdateOptions = {
completed: boolean;
};
export type TelegramUpdateTrackerState = {
highestAcceptedUpdateId: number | null;
highestPersistedAcceptedUpdateId: number | null;
highestCompletedUpdateId: number | null;
safeCompletedUpdateId: number | null;
pendingUpdateIds: number[];
failedUpdateIds: number[];
};
function sortedIds(ids: Set<number>): number[] {
return [...ids].toSorted((a, b) => a - b);
}
export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOptions = {}) {
const initialUpdateId =
typeof options.initialUpdateId === "number" ? options.initialUpdateId : null;
const recentUpdates = createTelegramUpdateDedupe();
const pendingUpdateKeys = new Set<string>();
const activeHandledUpdateKeys = new Map<string, boolean>();
const pendingUpdateIds = new Set<number>();
const failedUpdateIds = new Set<number>();
let highestAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistedAcceptedUpdateId: number | null = initialUpdateId;
let highestPersistenceRequestedUpdateId: number | null = initialUpdateId;
let highestCompletedUpdateId: number | null = initialUpdateId;
let persistInFlight = false;
let persistTargetUpdateId: number | null = null;
const skip = (key: string) => {
options.onSkip?.(key);
};
const drainPersistQueue = async () => {
const persist = options.onAcceptedUpdateId;
if (persistInFlight || typeof persist !== "function") {
return;
}
persistInFlight = true;
try {
while (persistTargetUpdateId !== null) {
const updateId = persistTargetUpdateId;
persistTargetUpdateId = null;
try {
await persist(updateId);
if (
highestPersistedAcceptedUpdateId === null ||
updateId > highestPersistedAcceptedUpdateId
) {
highestPersistedAcceptedUpdateId = updateId;
}
} catch (err) {
options.onPersistError?.(err);
}
}
} finally {
persistInFlight = false;
}
};
const requestPersistAcceptedUpdateId = (updateId: number) => {
if (typeof options.onAcceptedUpdateId !== "function") {
return;
}
if (
highestPersistenceRequestedUpdateId !== null &&
updateId <= highestPersistenceRequestedUpdateId
) {
return;
}
highestPersistenceRequestedUpdateId = updateId;
persistTargetUpdateId = updateId;
void drainPersistQueue().catch((err) => {
options.onPersistError?.(err);
});
};
const acceptUpdateId = (updateId: number) => {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
return;
}
highestAcceptedUpdateId = updateId;
requestPersistAcceptedUpdateId(updateId);
};
const beginUpdate = (ctx: TelegramUpdateKeyContext): BeginUpdateResult => {
const updateId = resolveTelegramUpdateId(ctx);
const updateKey = buildTelegramUpdateKey(ctx);
if (typeof updateId === "number") {
if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) {
if (!failedUpdateIds.has(updateId)) {
skip(`update:${updateId}`);
return { accepted: false, reason: "accepted-watermark" };
}
} else {
failedUpdateIds.delete(updateId);
}
}
if (updateKey) {
if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) {
skip(updateKey);
return { accepted: false, reason: "semantic-dedupe" };
}
pendingUpdateKeys.add(updateKey);
activeHandledUpdateKeys.set(updateKey, false);
}
if (typeof updateId === "number") {
pendingUpdateIds.add(updateId);
acceptUpdateId(updateId);
}
return {
accepted: true,
update: {
...(updateKey ? { key: updateKey } : {}),
...(typeof updateId === "number" ? { updateId } : {}),
},
};
};
const finishUpdate = (update: AcceptedTelegramUpdate, finish: FinishUpdateOptions) => {
if (update.key) {
activeHandledUpdateKeys.delete(update.key);
if (finish.completed) {
recentUpdates.check(update.key);
}
pendingUpdateKeys.delete(update.key);
}
if (typeof update.updateId === "number") {
pendingUpdateIds.delete(update.updateId);
if (finish.completed) {
failedUpdateIds.delete(update.updateId);
if (highestCompletedUpdateId === null || update.updateId > highestCompletedUpdateId) {
highestCompletedUpdateId = update.updateId;
}
} else {
failedUpdateIds.add(update.updateId);
}
}
};
const shouldSkipHandlerDispatch = (ctx: TelegramUpdateKeyContext) => {
const updateId = resolveTelegramUpdateId(ctx);
if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) {
return true;
}
const key = buildTelegramUpdateKey(ctx);
if (!key) {
return false;
}
const handled = activeHandledUpdateKeys.get(key);
if (handled != null) {
if (handled) {
skip(key);
return true;
}
activeHandledUpdateKeys.set(key, true);
return false;
}
const skipped = recentUpdates.check(key);
if (skipped) {
skip(key);
}
return skipped;
};
const resolveSafeCompletedUpdateId = () => {
if (highestCompletedUpdateId === null) {
return null;
}
let safeCompletedUpdateId = highestCompletedUpdateId;
for (const updateId of pendingUpdateIds) {
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
for (const updateId of failedUpdateIds) {
if (updateId <= safeCompletedUpdateId) {
safeCompletedUpdateId = updateId - 1;
}
}
return safeCompletedUpdateId;
};
const getState = (): TelegramUpdateTrackerState => ({
highestAcceptedUpdateId,
highestPersistedAcceptedUpdateId,
highestCompletedUpdateId,
safeCompletedUpdateId: resolveSafeCompletedUpdateId(),
pendingUpdateIds: sortedIds(pendingUpdateIds),
failedUpdateIds: sortedIds(failedUpdateIds),
});
return {
beginUpdate,
finishUpdate,
getState,
shouldSkipHandlerDispatch,
};
}