From b09e11bc6983ac5c282645e7fc17fd9eb0721c4a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 17 May 2026 00:44:31 +0100 Subject: [PATCH] fix: harden telegram routing edge cases --- .../telegram/src/action-runtime.test.ts | 42 +++++++----- extensions/telegram/src/action-runtime.ts | 9 +++ extensions/telegram/src/bot-core.ts | 4 ++ .../telegram/src/bot-update-tracker.test.ts | 32 +++++++++ extensions/telegram/src/bot-update-tracker.ts | 17 ++++- extensions/telegram/src/bot.types.ts | 1 + .../telegram/src/polling-session.test.ts | 68 ++++++++++++++++++- extensions/telegram/src/polling-session.ts | 5 +- extensions/telegram/src/send.ts | 15 ++++ 9 files changed, 172 insertions(+), 21 deletions(-) diff --git a/extensions/telegram/src/action-runtime.test.ts b/extensions/telegram/src/action-runtime.test.ts index ee6f1cbdafd..a050d3c9a18 100644 --- a/extensions/telegram/src/action-runtime.test.ts +++ b/extensions/telegram/src/action-runtime.test.ts @@ -367,6 +367,7 @@ describe("handleTelegramAction", () => { content: "Hello, Telegram!", }, telegramConfig(), + { gatewayClientScopes: ["operator.write"] }, ); const call = mockCall(sendMessageTelegram, 0, "text message"); expect(call[0]).toBe("@testchannel"); @@ -550,6 +551,7 @@ describe("handleTelegramAction", () => { media: "https://example.com/image.jpg", }, telegramConfig(), + { gatewayClientScopes: ["operator.write"] }, ); const call = mockCall(sendMessageTelegram, 0, "send alias"); expect(call[0]).toBe("@testchannel"); @@ -771,22 +773,28 @@ describe("handleTelegramAction", () => { readCallOpts: (calls: unknown[][], argIndex: number) => Record, ) => readCallOpts(editForumTopicTelegram.mock.calls as unknown[][], 2), }, - ])("forwards resolved cfg for $name action", async ({ params, cfg, assertCall }) => { - const readCallOpts = (calls: unknown[][], argIndex: number): Record => { - const args = calls[0]; - if (!Array.isArray(args)) { - throw new Error("Expected Telegram action call args"); - } - const opts = args[argIndex]; - if (!opts || typeof opts !== "object") { - throw new Error("Expected Telegram action options object"); - } - return opts as Record; - }; - await handleTelegramAction(params as Record, cfg); - const opts = assertCall(readCallOpts); - expect(opts.cfg).toBe(cfg); - }); + ])( + "forwards resolved cfg and gateway scopes for $name action", + async ({ params, cfg, assertCall }) => { + const readCallOpts = (calls: unknown[][], argIndex: number): Record => { + const args = calls[0]; + if (!Array.isArray(args)) { + throw new Error("Expected Telegram action call args"); + } + const opts = args[argIndex]; + if (!opts || typeof opts !== "object") { + throw new Error("Expected Telegram action options object"); + } + return opts as Record; + }; + await handleTelegramAction(params as Record, cfg, { + gatewayClientScopes: ["operator.write"], + }); + const opts = assertCall(readCallOpts); + expect(opts.cfg).toBe(cfg); + expect(opts.gatewayClientScopes).toEqual(["operator.write"]); + }, + ); it.each([ { @@ -908,6 +916,7 @@ describe("handleTelegramAction", () => { delivery: { pin: { enabled: true } }, }, telegramConfig(), + { gatewayClientScopes: ["operator.write"] }, ); const call = mockCall(pinMessageTelegram, 0, "delivery pin"); @@ -916,6 +925,7 @@ describe("handleTelegramAction", () => { const options = requireRecord(call[2], "delivery pin options"); expect(options.accountId).toBeUndefined(); expect(options.verbose).toBe(false); + expect(options.gatewayClientScopes).toEqual(["operator.write"]); }); it("passes delivery pin notify requests for action sends", async () => { diff --git a/extensions/telegram/src/action-runtime.ts b/extensions/telegram/src/action-runtime.ts index 0b63c30caaf..588fae68318 100644 --- a/extensions/telegram/src/action-runtime.ts +++ b/extensions/telegram/src/action-runtime.ts @@ -208,6 +208,7 @@ async function maybePinTelegramActionSend(params: { accountId?: string; to: string; messageId?: string; + gatewayClientScopes?: readonly string[]; }) { const pin = normalizeTelegramDeliveryPin(params.args); if (!pin) { @@ -225,6 +226,7 @@ async function maybePinTelegramActionSend(params: { accountId: params.accountId, notify: pin.notify, verbose: false, + gatewayClientScopes: params.gatewayClientScopes, }); } catch (err) { if (pin.required) { @@ -316,6 +318,7 @@ export async function handleTelegramAction( token, remove, accountId: accountId ?? undefined, + gatewayClientScopes: options?.gatewayClientScopes, }, ); } catch (err) { @@ -424,6 +427,7 @@ export async function handleTelegramAction( accountId: accountId ?? undefined, to, messageId: result.messageId, + gatewayClientScopes: options?.gatewayClientScopes, }); return jsonResult({ ok: true, @@ -524,6 +528,7 @@ export async function handleTelegramAction( cfg, token, accountId: accountId ?? undefined, + gatewayClientScopes: options?.gatewayClientScopes, }); if (!result.ok) { return jsonResult({ ok: false, deleted: false, warning: result.warning }); @@ -570,6 +575,7 @@ export async function handleTelegramAction( token, accountId: accountId ?? undefined, buttons, + gatewayClientScopes: options?.gatewayClientScopes, }, ); return jsonResult({ @@ -606,6 +612,7 @@ export async function handleTelegramAction( accountId: accountId ?? undefined, replyToMessageId: replyToMessageId ?? undefined, messageThreadId: messageThreadId ?? undefined, + gatewayClientScopes: options?.gatewayClientScopes, }); notifyVisibleOutboundSuccess(to, messageThreadId); return jsonResult({ @@ -661,6 +668,7 @@ export async function handleTelegramAction( accountId: accountId ?? undefined, iconColor, iconCustomEmojiId: iconCustomEmojiId ?? undefined, + gatewayClientScopes: options?.gatewayClientScopes, }); return jsonResult({ ok: true, @@ -696,6 +704,7 @@ export async function handleTelegramAction( accountId: accountId ?? undefined, name: name ?? undefined, iconCustomEmojiId: iconCustomEmojiId ?? undefined, + gatewayClientScopes: options?.gatewayClientScopes, }, ); return jsonResult(result); diff --git a/extensions/telegram/src/bot-core.ts b/extensions/telegram/src/bot-core.ts index de791594b04..b035253f7dd 100644 --- a/extensions/telegram/src/bot-core.ts +++ b/extensions/telegram/src/bot-core.ts @@ -154,6 +154,10 @@ export function createTelegramBotCore( }; const updateTracker = createTelegramUpdateTracker({ initialUpdateId, + persistenceFloorUpdateId: + typeof opts.updateOffset?.persistenceFloorUpdateId === "number" + ? opts.updateOffset.persistenceFloorUpdateId + : initialUpdateId, ackPolicy: "after_agent_dispatch", ...(typeof opts.updateOffset?.onUpdateId === "function" ? { onAcceptedUpdateId: opts.updateOffset.onUpdateId } diff --git a/extensions/telegram/src/bot-update-tracker.test.ts b/extensions/telegram/src/bot-update-tracker.test.ts index 75bfffc9e4f..43647113a57 100644 --- a/extensions/telegram/src/bot-update-tracker.test.ts +++ b/extensions/telegram/src/bot-update-tracker.test.ts @@ -133,6 +133,38 @@ describe("createTelegramUpdateTracker", () => { }); }); + it("can keep a persistence floor while replaying older spooled updates", async () => { + const onAcceptedUpdateId = vi.fn(); + const tracker = createTelegramUpdateTracker({ + initialUpdateId: null, + persistenceFloorUpdateId: 42, + ackPolicy: "after_agent_dispatch", + onAcceptedUpdateId, + }); + + const oldPending = tracker.beginUpdate(updateCtx(42)); + if (!oldPending.accepted) { + throw new Error("expected old spooled update to be accepted"); + } + tracker.finishUpdate(oldPending.update, { completed: false }); + + const newer = tracker.beginUpdate(updateCtx(43)); + if (!newer.accepted) { + throw new Error("expected newer update to be accepted"); + } + tracker.finishUpdate(newer.update, { completed: true }); + await flushTrackerMicrotasks(); + + expect(onAcceptedUpdateId).toHaveBeenCalledWith(43); + expectTrackerState(tracker.getState(), { + highestAcceptedUpdateId: 43, + highestPersistedAcceptedUpdateId: 43, + highestCompletedUpdateId: 43, + safeCompletedUpdateId: 43, + failedUpdateIds: [42], + } satisfies Partial); + }); + it("serializes and coalesces accepted offset persistence", async () => { const firstWrite = deferred(); const secondWrite = deferred(); diff --git a/extensions/telegram/src/bot-update-tracker.ts b/extensions/telegram/src/bot-update-tracker.ts index f016f6059e4..3fc10d20a94 100644 --- a/extensions/telegram/src/bot-update-tracker.ts +++ b/extensions/telegram/src/bot-update-tracker.ts @@ -14,6 +14,7 @@ type PersistUpdateId = (updateId: number) => void | Promise; type TelegramUpdateTrackerOptions = { initialUpdateId?: number | null; + persistenceFloorUpdateId?: number | null; ackPolicy?: MessageAckPolicy; onAcceptedUpdateId?: PersistUpdateId; onPersistError?: (error: unknown) => void; @@ -56,6 +57,10 @@ function sortedIds(ids: Set): number[] { export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOptions = {}) { const initialUpdateId = typeof options.initialUpdateId === "number" ? options.initialUpdateId : null; + const persistenceFloorUpdateId = + typeof options.persistenceFloorUpdateId === "number" + ? options.persistenceFloorUpdateId + : initialUpdateId; const ackPolicy = options.ackPolicy ?? "after_receive_record"; const recentUpdates = createTelegramUpdateDedupe(); const pendingUpdateKeys = new Set(); @@ -63,9 +68,9 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption const pendingUpdateIds = new Set(); const failedUpdateIds = new Set(); let highestAcceptedUpdateId: number | null = initialUpdateId; - let highestPersistedAcceptedUpdateId: number | null = initialUpdateId; - let highestPersistenceRequestedUpdateId: number | null = initialUpdateId; - let highestCompletedUpdateId: number | null = initialUpdateId; + let highestPersistedAcceptedUpdateId: number | null = persistenceFloorUpdateId; + let highestPersistenceRequestedUpdateId: number | null = persistenceFloorUpdateId; + let highestCompletedUpdateId: number | null = persistenceFloorUpdateId; let persistInFlight = false; let persistTargetUpdateId: number | null = null; @@ -130,11 +135,17 @@ export function createTelegramUpdateTracker(options: TelegramUpdateTrackerOption } let safeCompletedUpdateId = highestCompletedUpdateId; for (const updateId of pendingUpdateIds) { + if (persistenceFloorUpdateId !== null && updateId <= persistenceFloorUpdateId) { + continue; + } if (updateId <= safeCompletedUpdateId) { safeCompletedUpdateId = updateId - 1; } } for (const updateId of failedUpdateIds) { + if (persistenceFloorUpdateId !== null && updateId <= persistenceFloorUpdateId) { + continue; + } if (updateId <= safeCompletedUpdateId) { safeCompletedUpdateId = updateId - 1; } diff --git a/extensions/telegram/src/bot.types.ts b/extensions/telegram/src/bot.types.ts index 20d0ccb951b..98d41ae6dd5 100644 --- a/extensions/telegram/src/bot.types.ts +++ b/extensions/telegram/src/bot.types.ts @@ -23,6 +23,7 @@ export type TelegramBotOptions = { minimumClientTimeoutSeconds?: number; updateOffset?: { lastUpdateId?: number | null; + persistenceFloorUpdateId?: number | null; onUpdateId?: (updateId: number) => void | Promise; }; testTimings?: { diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 14f0790c71a..9b26e5632e2 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -242,6 +242,7 @@ function createPollingSession(params: { log?: (message: string) => void; telegramTransport?: ReturnType; createTelegramTransport?: () => ReturnType; + getLastUpdateId?: () => number | null; stallThresholdMs?: number; setStatus?: (patch: Omit) => void; isolatedIngress?: ConstructorParameters[0]["isolatedIngress"]; @@ -254,7 +255,7 @@ function createPollingSession(params: { proxyFetch: undefined, abortSignal: params.abortSignal, runnerOptions: {}, - getLastUpdateId: () => null, + getLastUpdateId: params.getLastUpdateId ?? (() => null), persistUpdateId: async () => undefined, log: params.log ?? (() => undefined), telegramTransport: params.telegramTransport, @@ -587,6 +588,7 @@ describe("TelegramPollingSession", () => { ); expect(mockObjectArg(createTelegramBotMock, "createTelegramBot").updateOffset).toEqual({ lastUpdateId: null, + persistenceFloorUpdateId: null, onUpdateId: expect.any(Function), }); expect(init).toHaveBeenCalledBefore(handleUpdate); @@ -596,6 +598,70 @@ describe("TelegramPollingSession", () => { } }); + it("drains existing isolated ingress spool entries below the persisted offset", async () => { + const abort = new AbortController(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + const handleUpdate = vi.fn(async () => undefined); + createTelegramBotMock.mockReturnValueOnce({ + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + init: vi.fn(async () => undefined), + handleUpdate, + stop: vi.fn(async () => undefined), + }); + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: { update_id: 42, message: { text: "pre-upgrade pending" } }, + }); + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + const createWorker = vi.fn(() => ({ + onMessage: vi.fn(() => () => undefined), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + await workerDone; + }), + })); + + try { + const session = createPollingSession({ + abortSignal: abort.signal, + getLastUpdateId: () => 42, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: 10, + }, + }); + + const runPromise = session.runUntilAbort(); + await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1)); + await vi.waitFor(async () => expect(await fs.readdir(tempDir)).toEqual([])); + abort.abort(); + await runPromise; + + expect(createWorker).toHaveBeenCalledWith(expect.objectContaining({ initialUpdateId: 42 })); + expect(mockObjectArg(createTelegramBotMock, "createTelegramBot").updateOffset).toEqual({ + lastUpdateId: null, + persistenceFloorUpdateId: 42, + onUpdateId: expect.any(Function), + }); + expect(handleUpdate).toHaveBeenCalledWith({ + update_id: 42, + message: { text: "pre-upgrade pending" }, + }); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + it("drains Telegram delivery queue after isolated ingress reports poll success", async () => { const abort = new AbortController(); const init = vi.fn(async () => undefined); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 2d1959308f8..9da531fd73d 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -279,8 +279,11 @@ export class TelegramPollingSession { const fetchAbortController = new AbortController(); this.#activeFetchAbort = fetchAbortController; const telegramTransport = this.#transportState.acquireForNextCycle(); + const persistedLastUpdateId = this.opts.getLastUpdateId(); + const lastUpdateId = this.opts.isolatedIngress?.enabled ? null : persistedLastUpdateId; const updateOffset = { - lastUpdateId: this.opts.getLastUpdateId(), + lastUpdateId, + persistenceFloorUpdateId: persistedLastUpdateId, onUpdateId: this.opts.persistUpdateId, }; try { diff --git a/extensions/telegram/src/send.ts b/extensions/telegram/src/send.ts index c9bdb901fb3..ae8f5aee66a 100644 --- a/extensions/telegram/src/send.ts +++ b/extensions/telegram/src/send.ts @@ -123,6 +123,7 @@ type TelegramReactionOpts = { remove?: boolean; verbose?: boolean; retry?: RetryConfig; + gatewayClientScopes?: readonly string[]; }; type TelegramTypingOpts = { @@ -1040,6 +1041,7 @@ export async function reactMessageTelegram( lookupTarget: rawTarget, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageId = normalizeMessageId(messageIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1080,6 +1082,7 @@ type TelegramDeleteOpts = { verbose?: boolean; api?: TelegramApiOverride; retry?: RetryConfig; + gatewayClientScopes?: readonly string[]; }; export async function deleteMessageTelegram( @@ -1095,6 +1098,7 @@ export async function deleteMessageTelegram( lookupTarget: rawTarget, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageId = normalizeMessageId(messageIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1136,6 +1140,7 @@ export async function pinMessageTelegram( lookupTarget: rawTarget, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageId = normalizeMessageId(messageIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1168,6 +1173,7 @@ export async function unpinMessageTelegram( lookupTarget: rawTarget, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageId = messageIdInput === undefined ? undefined : normalizeMessageId(messageIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1229,6 +1235,7 @@ export async function editForumTopicTelegram( lookupTarget: target.chatId, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageThreadId = normalizeMessageId(messageThreadIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1279,6 +1286,7 @@ type TelegramEditOpts = { verbose?: boolean; api?: TelegramApiOverride; retry?: RetryConfig; + gatewayClientScopes?: readonly string[]; textMode?: "markdown" | "html"; /** Controls whether link previews are shown in the edited message. */ linkPreview?: boolean; @@ -1294,6 +1302,7 @@ type TelegramEditReplyMarkupOpts = { verbose?: boolean; api?: TelegramApiOverride; retry?: RetryConfig; + gatewayClientScopes?: readonly string[]; /** Inline keyboard buttons (reply markup). Pass empty array to remove buttons. */ buttons?: TelegramInlineButtons; /** Resolved runtime config from the command or gateway boundary. */ @@ -1317,6 +1326,7 @@ export async function editMessageReplyMarkupTelegram( lookupTarget: rawTarget, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageId = normalizeMessageId(messageIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1360,6 +1370,7 @@ export async function editMessageTelegram( lookupTarget: rawTarget, persistTarget: rawTarget, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const messageId = normalizeMessageId(messageIdInput); const requestWithDiag = createTelegramRequestWithDiag({ @@ -1463,6 +1474,7 @@ type TelegramStickerOpts = { verbose?: boolean; api?: TelegramApiOverride; retry?: RetryConfig; + gatewayClientScopes?: readonly string[]; /** Message ID to reply to (for threading) */ replyToMessageId?: number; /** Forum topic thread ID (for forum supergroups) */ @@ -1492,6 +1504,7 @@ export async function sendStickerTelegram( lookupTarget: target.chatId, persistTarget: to, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const threadParams = buildTelegramThreadReplyParams({ @@ -1664,6 +1677,7 @@ type TelegramCreateForumTopicOpts = { api?: TelegramApiOverride; verbose?: boolean; retry?: RetryConfig; + gatewayClientScopes?: readonly string[]; /** Icon color for the topic (must be one of 0x6FB9F0, 0xFFD67E, 0xCB86DB, 0x8EEE98, 0xFF93B2, 0xFB6F5F). */ iconColor?: TelegramCreateForumTopicParams["icon_color"]; /** Custom emoji ID for the topic icon. */ @@ -1707,6 +1721,7 @@ export async function createForumTopicTelegram( lookupTarget: target.chatId, persistTarget: chatId, verbose: opts.verbose, + gatewayClientScopes: opts.gatewayClientScopes, }); const requestWithDiag = createTelegramNonIdempotentRequestWithDiag({