diff --git a/CHANGELOG.md b/CHANGELOG.md index daa694d4664..aa54e2b0893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai - Security/Hooks transforms: enforce symlink-safe containment for webhook transform module paths (including `hooks.transformsDir` and `hooks.mappings[].transform.module`) by resolving existing-path ancestors via realpath before import, while preserving in-root symlink support; add regression coverage for both escape and allow cases. This ships in the next npm release. Thanks @aether-ai-agent for reporting. - Telegram/WSL2: disable `autoSelectFamily` by default on WSL2 and memoize WSL2 detection in Telegram network decision logic to avoid repeated sync `/proc/version` probes on fetch/send paths. (#21916) Thanks @MizukiMachine. - Telegram/Streaming: preserve archived draft preview mapping after flush and clean superseded reasoning preview bubbles so multi-message preview finals no longer cross-edit or orphan stale messages under send/rotation races. (#23202) Thanks @obviyus. +- Telegram/Polling: persist a safe update-offset watermark bounded by pending updates so crash/restart cannot skip queued lower `update_id` updates after out-of-order completion. (#23284) thanks @frankekn. - Slack/Slash commands: preserve the Bolt app receiver when registering external select options handlers so monitor startup does not crash on runtimes that require bound `app.options` calls. (#23209) Thanks @0xgaia. - Slack/Telegram slash sessions: await session metadata persistence before dispatch so first-turn native slash runs do not race session-origin metadata updates. (#23065) thanks @hydro13. - Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester. diff --git a/src/telegram/bot.create-telegram-bot.test.ts b/src/telegram/bot.create-telegram-bot.test.ts index c5c38b8dd33..ba72eb01af8 100644 --- a/src/telegram/bot.create-telegram-bot.test.ts +++ b/src/telegram/bot.create-telegram-bot.test.ts @@ -445,6 +445,83 @@ describe("createTelegramBot", () => { }); expect(replySpy).toHaveBeenCalledTimes(1); }); + + it("does not persist update offset past pending updates", async () => { + // For this test we need sequentialize(...) to behave like a normal middleware and call next(). + sequentializeSpy.mockImplementationOnce( + () => async (_ctx: unknown, next: () => Promise) => { + await next(); + }, + ); + + const onUpdateId = vi.fn(); + loadConfig.mockReturnValue({ + channels: { telegram: { dmPolicy: "open", allowFrom: ["*"] } }, + }); + + createTelegramBot({ + token: "tok", + updateOffset: { + lastUpdateId: 100, + onUpdateId, + }, + }); + + type Middleware = ( + ctx: Record, + next: () => Promise, + ) => Promise | void; + + const middlewares = middlewareUseSpy.mock.calls + .map((call) => call[0]) + .filter((fn): fn is Middleware => typeof fn === "function"); + + const runMiddlewareChain = async ( + ctx: Record, + finalNext: () => Promise, + ) => { + let idx = -1; + const dispatch = async (i: number): Promise => { + if (i <= idx) { + throw new Error("middleware dispatch called multiple times"); + } + idx = i; + const fn = middlewares[i]; + if (!fn) { + await finalNext(); + return; + } + await fn(ctx, async () => dispatch(i + 1)); + }; + await dispatch(0); + }; + + let releaseUpdate101: (() => void) | undefined; + const update101Gate = new Promise((resolve) => { + releaseUpdate101 = resolve; + }); + + // Start processing update 101 but keep it pending (simulates an update queued behind sequentialize()). + const p101 = runMiddlewareChain({ update: { update_id: 101 } }, async () => update101Gate); + // Let update 101 enter the chain and mark itself pending before 102 completes. + await Promise.resolve(); + + // Complete update 102 while 101 is still pending. The persisted watermark must not jump to 102. + await runMiddlewareChain({ update: { update_id: 102 } }, async () => {}); + + const persistedValues = onUpdateId.mock.calls.map((call) => Number(call[0])); + const maxPersisted = persistedValues.length > 0 ? Math.max(...persistedValues) : -Infinity; + expect(maxPersisted).toBeLessThan(101); + + releaseUpdate101?.(); + await p101; + + // Once the pending update finishes, the watermark can safely catch up. + const persistedAfterDrain = onUpdateId.mock.calls.map((call) => Number(call[0])); + const maxPersistedAfterDrain = + persistedAfterDrain.length > 0 ? Math.max(...persistedAfterDrain) : -Infinity; + expect(maxPersistedAfterDrain).toBe(102); + }); it("allows distinct callback_query ids without update_id", async () => { loadConfig.mockReturnValue({ channels: { diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 9bca2dfc6c4..7485d0dac69 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -148,34 +148,53 @@ export function createTelegramBot(opts: TelegramBotOptions) { const bot = new Bot(opts.token, client ? { client } : undefined); bot.api.config.use(apiThrottler()); - bot.use(sequentialize(getTelegramSequentialKey)); // Catch all errors from bot middleware to prevent unhandled rejections bot.catch((err) => { runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`)); }); const recentUpdates = createTelegramUpdateDedupe(); - let lastUpdateId = + const initialUpdateId = typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null; - const recordUpdateId = (ctx: TelegramUpdateKeyContext) => { - const updateId = resolveTelegramUpdateId(ctx); - if (typeof updateId !== "number") { + // Track update_ids that have entered the middleware pipeline but have not completed yet. + // This includes updates that are "queued" behind sequentialize(...) for a chat/topic key. + // We only persist a watermark that is strictly less than the smallest pending update_id, + // so we never write an offset that would skip an update still waiting to run. + const pendingUpdateIds = new Set(); + let highestCompletedUpdateId: number | null = initialUpdateId; + let highestPersistedUpdateId: number | null = initialUpdateId; + const maybePersistSafeWatermark = () => { + if (typeof opts.updateOffset?.onUpdateId !== "function") { return; } - if (lastUpdateId !== null && updateId <= lastUpdateId) { + if (highestCompletedUpdateId === null) { return; } - lastUpdateId = updateId; - void opts.updateOffset?.onUpdateId?.(updateId); + let safe = highestCompletedUpdateId; + if (pendingUpdateIds.size > 0) { + let minPending: number | null = null; + for (const id of pendingUpdateIds) { + if (minPending === null || id < minPending) { + minPending = id; + } + } + if (minPending !== null) { + safe = Math.min(safe, minPending - 1); + } + } + if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) { + return; + } + highestPersistedUpdateId = safe; + void opts.updateOffset.onUpdateId(safe); }; const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { const updateId = resolveTelegramUpdateId(ctx); - if (typeof updateId === "number" && lastUpdateId !== null) { - if (updateId <= lastUpdateId) { - return true; - } + const skipCutoff = highestPersistedUpdateId ?? initialUpdateId; + if (typeof updateId === "number" && skipCutoff !== null && updateId <= skipCutoff) { + return true; } const key = buildTelegramUpdateKey(ctx); const skipped = recentUpdates.check(key); @@ -185,6 +204,26 @@ export function createTelegramBot(opts: TelegramBotOptions) { return skipped; }; + bot.use(async (ctx, next) => { + const updateId = resolveTelegramUpdateId(ctx); + if (typeof updateId === "number") { + pendingUpdateIds.add(updateId); + } + try { + await next(); + } finally { + if (typeof updateId === "number") { + pendingUpdateIds.delete(updateId); + if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) { + highestCompletedUpdateId = updateId; + } + maybePersistSafeWatermark(); + } + } + }); + + bot.use(sequentialize(getTelegramSequentialKey)); + const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update"); const MAX_RAW_UPDATE_CHARS = 8000; const MAX_RAW_UPDATE_STRING = 500; @@ -223,7 +262,6 @@ export function createTelegramBot(opts: TelegramBotOptions) { } } await next(); - recordUpdateId(ctx); }); const historyLimit = Math.max(