From ae57a7998ef06886ef81a840a89a95f5c3604729 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 01:34:41 +0100 Subject: [PATCH] fix(telegram): persist accepted update offsets --- CHANGELOG.md | 2 + extensions/telegram/src/bot-core.ts | 72 ++++------- .../src/bot.create-telegram-bot.test.ts | 118 ++++++++++++++---- scripts/copy-bundled-plugin-metadata.mjs | 96 +++++++++++++- .../copy-bundled-plugin-metadata.test.ts | 68 +++++++++- 5 files changed, 280 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 529db9613a5..0cdb968df6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ Docs: https://docs.openclaw.ai - Agents/failover: forward embedded run abort signals into provider-owned model streams, cap implicit LLM idle watchdogs below long run timeouts, and mark 429 responses without usable retry timing as non-retryable so GitHub Copilot rate limits fail over or surface promptly instead of hanging until run timeout. Fixes #71120. - Plugins/Google Meet: make meeting creation join by default, with an explicit URL-only opt-out, so agents that create a Meet also enter it. +- Telegram/polling: persist accepted update offsets before long-running handlers complete so poller restarts do not replay already-ingested updates, while keeping same-process retries for handler failures. +- Telegram/config: include generated Telegram channel config schema metadata in packaged plugin manifests so forum-topic/group config is accepted before runtime loads. - Browser/tool: keep explicit AI snapshots from inheriting the efficient role-snapshot default and preserve numeric Playwright AI refs, so `--format ai` remains a real AI snapshot path. Fixes #62550. Thanks @ly85206559. - Gateway/config: keep in-process config patch reload comparisons on the resolved source snapshot when `${VAR}` env refs are restored on disk, avoiding false full gateway restarts for unchanged gateway/plugin secrets. Fixes #71208. Thanks @robbiethompson18. - Slack/messages: serialize write-client requests and whole outbound sends per target so rapid multi-message Slack replies preserve send order. Fixes #69101. (#69105) Thanks @nightq and @ztexydt-cqh. diff --git a/extensions/telegram/src/bot-core.ts b/extensions/telegram/src/bot-core.ts index f46717b486b..3b649720e00 100644 --- a/extensions/telegram/src/bot-core.ts +++ b/extensions/telegram/src/bot-core.ts @@ -283,51 +283,24 @@ export function createTelegramBotCore( const activeHandledUpdateKeys = new Map(); const initialUpdateId = typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null; - - // Track update_ids that have entered the middleware pipeline but have not completed yet. - // This includes updates that are "queued" behind sequentialize(...) for a chat/topic key. - // We only persist a watermark that is strictly less than the smallest pending update_id, - // so we never write an offset that would skip an update still waiting to run. - const pendingUpdateIds = new Set(); const failedUpdateIds = new Set(); - let highestCompletedUpdateId: number | null = initialUpdateId; + let highestAcceptedUpdateId: number | null = initialUpdateId; let highestPersistedUpdateId: number | null = initialUpdateId; - const maybePersistSafeWatermark = () => { + + const persistAcceptedUpdateId = (updateId: number) => { + if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) { + return; + } + highestAcceptedUpdateId = updateId; if (typeof opts.updateOffset?.onUpdateId !== "function") { return; } - if (highestCompletedUpdateId === null) { + if (highestPersistedUpdateId !== null && updateId <= highestPersistedUpdateId) { return; } - let safe = highestCompletedUpdateId; - if (pendingUpdateIds.size > 0) { - let minPending: number | null = null; - for (const id of pendingUpdateIds) { - if (minPending === null || id < minPending) { - minPending = id; - } - } - if (minPending !== null) { - safe = Math.min(safe, minPending - 1); - } - } - if (failedUpdateIds.size > 0) { - let minFailed: number | null = null; - for (const id of failedUpdateIds) { - if (minFailed === null || id < minFailed) { - minFailed = id; - } - } - if (minFailed !== null) { - safe = Math.min(safe, minFailed - 1); - } - } - if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) { - return; - } - highestPersistedUpdateId = safe; + highestPersistedUpdateId = updateId; void Promise.resolve() - .then(() => opts.updateOffset?.onUpdateId?.(safe)) + .then(() => opts.updateOffset?.onUpdateId?.(updateId)) .catch((err) => { runtime.error?.(`telegram: failed to persist update watermark: ${formatErrorMessage(err)}`); }); @@ -341,8 +314,7 @@ export function createTelegramBotCore( const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { const updateId = resolveTelegramUpdateId(ctx); - const skipCutoff = highestPersistedUpdateId ?? initialUpdateId; - if (typeof updateId === "number" && skipCutoff !== null && updateId <= skipCutoff) { + if (typeof updateId === "number" && initialUpdateId !== null && updateId <= initialUpdateId) { return true; } const key = buildTelegramUpdateKey(ctx); @@ -370,20 +342,26 @@ export function createTelegramBotCore( const updateKey = buildTelegramUpdateKey(ctx); let completed = false; if (typeof updateId === "number") { - failedUpdateIds.delete(updateId); - pendingUpdateIds.add(updateId); + if (highestAcceptedUpdateId !== null && updateId <= highestAcceptedUpdateId) { + if (!failedUpdateIds.has(updateId)) { + logSkippedUpdate(`update:${updateId}`); + return; + } + } else { + failedUpdateIds.delete(updateId); + } } if (updateKey) { if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) { logSkippedUpdate(updateKey); - if (typeof updateId === "number") { - pendingUpdateIds.delete(updateId); - } return; } pendingUpdateKeys.add(updateKey); activeHandledUpdateKeys.set(updateKey, false); } + if (typeof updateId === "number") { + persistAcceptedUpdateId(updateId); + } try { await next(); completed = true; @@ -396,12 +374,8 @@ export function createTelegramBotCore( pendingUpdateKeys.delete(updateKey); } if (typeof updateId === "number") { - pendingUpdateIds.delete(updateId); if (completed) { - if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) { - highestCompletedUpdateId = updateId; - } - maybePersistSafeWatermark(); + failedUpdateIds.delete(updateId); } else { failedUpdateIds.add(updateId); } diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index a7bbaf801b4..d4053daccfd 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -1091,7 +1091,7 @@ describe("createTelegramBot", () => { expect(replySpy).toHaveBeenCalledTimes(1); }); - it("does not persist update offset past pending updates", async () => { + it("persists accepted update offsets before completion", async () => { // For this test we need sequentialize(...) to behave like a normal middleware and call next(). sequentializeSpy.mockImplementationOnce( () => async (_ctx: unknown, next: () => Promise) => { @@ -1146,26 +1146,20 @@ describe("createTelegramBot", () => { releaseUpdate101 = resolve; }); - // Start processing update 101 but keep it pending (simulates an update queued behind sequentialize()). + // Start processing update 101 but keep it pending (simulates a long-running turn). const p101 = runMiddlewareChain({ update: { update_id: 101 } }, async () => update101Gate); - // Let update 101 enter the chain and mark itself pending before 102 completes. + // Let update 101 enter the chain and persist acceptance before 102 completes. await Promise.resolve(); + expect(onUpdateId).toHaveBeenCalledWith(101); - // Complete update 102 while 101 is still pending. The persisted watermark must not jump to 102. + // Complete update 102 while 101 is still pending. Restart replay protection is at-most-once. await runMiddlewareChain({ update: { update_id: 102 } }, async () => {}); - - const persistedValues = onUpdateId.mock.calls.map((call) => Number(call[0])); - const maxPersisted = persistedValues.length > 0 ? Math.max(...persistedValues) : -Infinity; - expect(maxPersisted).toBeLessThan(101); + expect(onUpdateId).toHaveBeenCalledWith(102); releaseUpdate101?.(); await p101; - // Once the pending update finishes, the watermark can safely catch up. - const persistedAfterDrain = onUpdateId.mock.calls.map((call) => Number(call[0])); - const maxPersistedAfterDrain = - persistedAfterDrain.length > 0 ? Math.max(...persistedAfterDrain) : -Infinity; - expect(maxPersistedAfterDrain).toBe(102); + expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([101, 102]); }); it("logs and swallows update watermark persistence failures", async () => { sequentializeSpy.mockImplementationOnce( @@ -1237,7 +1231,7 @@ describe("createTelegramBot", () => { } }); - it("does not persist failed updates into the watermark", async () => { + it("persists failed updates once accepted while preserving same-process retries", async () => { sequentializeSpy.mockImplementationOnce( () => async (_ctx: unknown, next: () => Promise) => { await next(); @@ -1288,18 +1282,100 @@ describe("createTelegramBot", () => { throw new Error("middleware boom"); }), ).rejects.toThrow("middleware boom"); + await flushTelegramTestMicrotasks(); + expect(onUpdateId).toHaveBeenCalledWith(201); await runMiddlewareChain({ update: { update_id: 202 } }, async () => {}); - await new Promise((resolve) => setTimeout(resolve, 0)); - expect(onUpdateId).not.toHaveBeenCalled(); - expect(onUpdateId).not.toHaveBeenCalledWith(201); - expect(onUpdateId).not.toHaveBeenCalledWith(202); - - await runMiddlewareChain({ update: { update_id: 201 } }, async () => {}); - await flushTelegramTestMicrotasks(); expect(onUpdateId).toHaveBeenCalledWith(202); + + const retryHandler = vi.fn(); + await runMiddlewareChain({ update: { update_id: 201 } }, async () => { + retryHandler(); + }); + + await flushTelegramTestMicrotasks(); + expect(retryHandler).toHaveBeenCalledTimes(1); + expect(onUpdateId.mock.calls.map((call) => Number(call[0]))).toEqual([201, 202]); + }); + + it("skips replayed update ids even when the semantic update key differs", async () => { + sequentializeSpy.mockImplementationOnce( + () => async (_ctx: unknown, next: () => Promise) => { + await next(); + }, + ); + + const onUpdateId = vi.fn(); + + createTelegramBot({ + token: "tok", + updateOffset: { + lastUpdateId: 300, + onUpdateId, + }, + }); + + type Middleware = ( + ctx: Record, + next: () => Promise, + ) => Promise | void; + + const middlewares = middlewareUseSpy.mock.calls + .map((call) => call[0]) + .filter((fn): fn is Middleware => typeof fn === "function"); + + const runMiddlewareChain = async ( + ctx: Record, + finalNext: () => Promise, + ) => { + let idx = -1; + const dispatch = async (i: number): Promise => { + if (i <= idx) { + throw new Error("middleware dispatch called multiple times"); + } + idx = i; + const fn = middlewares[i]; + if (!fn) { + await finalNext(); + return; + } + await fn(ctx, async () => dispatch(i + 1)); + }; + await dispatch(0); + }; + + const handler = vi.fn(); + await runMiddlewareChain( + { + update: { + update_id: 301, + message: { chat: { id: 1 }, message_id: 10 }, + }, + }, + async () => { + handler(); + }, + ); + + const replayHandler = vi.fn(); + await runMiddlewareChain( + { + update: { + update_id: 301, + message: { chat: { id: 1 }, message_id: 11 }, + }, + }, + async () => { + replayHandler(); + }, + ); + + await flushTelegramTestMicrotasks(); + expect(onUpdateId).toHaveBeenCalledWith(301); + expect(handler).toHaveBeenCalledTimes(1); + expect(replayHandler).not.toHaveBeenCalled(); }); it("allows distinct callback_query ids without update_id", async () => { loadConfig.mockReturnValue({ diff --git a/scripts/copy-bundled-plugin-metadata.mjs b/scripts/copy-bundled-plugin-metadata.mjs index 1f83bf9d84b..a7eb8f4098a 100644 --- a/scripts/copy-bundled-plugin-metadata.mjs +++ b/scripts/copy-bundled-plugin-metadata.mjs @@ -1,6 +1,7 @@ import fs from "node:fs"; import path from "node:path"; import { pathToFileURL } from "node:url"; +import JSON5 from "json5"; import { NON_PACKAGED_BUNDLED_PLUGIN_DIRS } from "./lib/bundled-plugin-build-entries.mjs"; import { shouldBuildBundledCluster } from "./lib/optional-bundled-clusters.mjs"; import { @@ -10,6 +11,8 @@ import { } from "./runtime-postbuild-shared.mjs"; const GENERATED_BUNDLED_SKILLS_DIR = "bundled-skills"; +const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA_PATH = + "src/config/bundled-channel-config-metadata.generated.ts"; const TRANSIENT_COPY_ERROR_CODES = new Set(["EEXIST", "ENOENT", "ENOTEMPTY", "EBUSY"]); const COPY_RETRY_DELAYS_MS = [10, 25, 50]; @@ -217,6 +220,86 @@ function copyDeclaredPluginSkillPaths(params) { return copiedSkills; } +function readGeneratedBundledChannelConfigs(repoRoot) { + const metadataPath = path.join(repoRoot, GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA_PATH); + if (!fs.existsSync(metadataPath)) { + return new Map(); + } + const source = fs.readFileSync(metadataPath, "utf8"); + const match = source.match( + /export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = ([\s\S]*?) as const;/u, + ); + if (!match?.[1]) { + return new Map(); + } + let entries; + try { + entries = JSON5.parse(match[1]); + } catch { + return new Map(); + } + if (!Array.isArray(entries)) { + return new Map(); + } + const byPlugin = new Map(); + for (const entry of entries) { + if ( + !entry || + typeof entry !== "object" || + typeof entry.pluginId !== "string" || + typeof entry.channelId !== "string" || + !entry.schema || + typeof entry.schema !== "object" + ) { + continue; + } + const pluginConfigs = byPlugin.get(entry.pluginId) ?? {}; + pluginConfigs[entry.channelId] = { + schema: entry.schema, + ...(typeof entry.label === "string" && entry.label ? { label: entry.label } : {}), + ...(typeof entry.description === "string" && entry.description + ? { description: entry.description } + : {}), + ...(entry.uiHints && typeof entry.uiHints === "object" ? { uiHints: entry.uiHints } : {}), + }; + byPlugin.set(entry.pluginId, pluginConfigs); + } + return byPlugin; +} + +function mergeGeneratedChannelConfigs(manifest, generatedChannelConfigs) { + if (!generatedChannelConfigs || Object.keys(generatedChannelConfigs).length === 0) { + return manifest; + } + const existingChannelConfigs = + manifest.channelConfigs && typeof manifest.channelConfigs === "object" + ? manifest.channelConfigs + : {}; + const channelConfigs = { ...existingChannelConfigs }; + for (const [channelId, generated] of Object.entries(generatedChannelConfigs)) { + const existing = + existingChannelConfigs[channelId] && typeof existingChannelConfigs[channelId] === "object" + ? existingChannelConfigs[channelId] + : {}; + channelConfigs[channelId] = { + ...generated, + ...existing, + schema: generated.schema, + ...(generated.uiHints || existing.uiHints + ? { uiHints: { ...generated.uiHints, ...existing.uiHints } } + : {}), + ...(existing.label || generated.label ? { label: existing.label ?? generated.label } : {}), + ...(existing.description || generated.description + ? { description: existing.description ?? generated.description } + : {}), + }; + } + return { + ...manifest, + channelConfigs, + }; +} + /** * @param {{ * cwd?: string; @@ -233,6 +316,7 @@ export function copyBundledPluginMetadata(params = {}) { return; } + const generatedChannelConfigsByPlugin = readGeneratedBundledChannelConfigs(repoRoot); const sourcePluginDirs = new Set(); for (const dirent of fs.readdirSync(extensionsRoot, { withFileTypes: true })) { if (!dirent.isDirectory()) { @@ -275,18 +359,22 @@ export function copyBundledPluginMetadata(params = {}) { if (fs.existsSync(manifestPath)) { const manifest = JSON.parse(fs.readFileSync(manifestPath, "utf8")); + const manifestWithGeneratedChannelConfigs = mergeGeneratedChannelConfigs( + manifest, + generatedChannelConfigsByPlugin.get(manifest.id), + ); // Generated skill assets live under a dedicated dist-owned directory. Runtime // dependency staging owns dist plugin node_modules; do not remove it here. removePathIfExists(path.join(distPluginDir, GENERATED_BUNDLED_SKILLS_DIR)); const copiedSkills = copyDeclaredPluginSkillPaths({ - manifest, + manifest: manifestWithGeneratedChannelConfigs, pluginDir, distPluginDir, repoRoot, }); - const bundledManifest = Array.isArray(manifest.skills) - ? { ...manifest, skills: copiedSkills } - : manifest; + const bundledManifest = Array.isArray(manifestWithGeneratedChannelConfigs.skills) + ? { ...manifestWithGeneratedChannelConfigs, skills: copiedSkills } + : manifestWithGeneratedChannelConfigs; writeTextFileIfChanged(distManifestPath, `${JSON.stringify(bundledManifest, null, 2)}\n`); } else { removeFileIfExists(distManifestPath); diff --git a/src/plugins/copy-bundled-plugin-metadata.test.ts b/src/plugins/copy-bundled-plugin-metadata.test.ts index 16638a84db4..a520f5e2e93 100644 --- a/src/plugins/copy-bundled-plugin-metadata.test.ts +++ b/src/plugins/copy-bundled-plugin-metadata.test.ts @@ -45,13 +45,13 @@ function createPlugin( return pluginDir; } -function readBundledManifest(repoRoot: string, pluginId: string) { +function readBundledManifest(repoRoot: string, pluginId: string): Record { return JSON.parse( fs.readFileSync( path.join(repoRoot, "dist", "extensions", pluginId, "openclaw.plugin.json"), "utf8", ), - ) as { skills?: string[] }; + ) as Record; } function readBundledPackageJson(repoRoot: string, pluginId: string) { @@ -126,6 +126,70 @@ describe("copyBundledPluginMetadata", () => { expect(packageJson.openclaw?.extensions).toEqual(["./index.js"]); }); + it("copies generated bundled channel config schemas into dist manifests", () => { + const repoRoot = makeRepoRoot("openclaw-bundled-channel-config-meta-"); + createPlugin(repoRoot, { + id: "telegram", + packageName: "@openclaw/telegram", + manifest: { + channels: ["telegram"], + channelConfigs: { + telegram: { + schema: { type: "object", properties: { stale: { type: "boolean" } } }, + uiHints: { + "channels.telegram.stale": { help: "stale hint" }, + }, + }, + }, + }, + packageOpenClaw: { extensions: ["./index.ts"] }, + }); + fs.mkdirSync(path.join(repoRoot, "src", "config"), { recursive: true }); + fs.writeFileSync( + path.join(repoRoot, "src", "config", "bundled-channel-config-metadata.generated.ts"), + [ + "// generated test fixture", + "export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [", + " {", + ' pluginId: "telegram",', + ' channelId: "telegram",', + ' label: "Telegram",', + " schema: {", + ' type: "object",', + " properties: {", + ' groups: { type: "object" }', + " }", + " },", + " uiHints: {", + ' "channels.telegram.groups": { help: "generated hint" }', + " }", + " }", + "] as const;", + "", + ].join("\n"), + "utf8", + ); + + copyBundledPluginMetadata({ repoRoot }); + + const manifest = readBundledManifest(repoRoot, "telegram"); + expect(manifest.channelConfigs).toEqual({ + telegram: { + schema: { + type: "object", + properties: { + groups: { type: "object" }, + }, + }, + label: "Telegram", + uiHints: { + "channels.telegram.groups": { help: "generated hint" }, + "channels.telegram.stale": { help: "stale hint" }, + }, + }, + }); + }); + it("relocates node_modules-backed skill paths into bundled-skills and rewrites the manifest", () => { const repoRoot = makeRepoRoot("openclaw-bundled-plugin-node-modules-"); const pluginDir = createTlonSkillPlugin(repoRoot);