From e8191e5b8f65f0e0544182e5ce193e096aa3354c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 08:22:40 +0100 Subject: [PATCH] fix: ack Telegram webhooks before update handling --- CHANGELOG.md | 1 + docs/channels/telegram.md | 3 + extensions/telegram/src/webhook.test.ts | 196 ++++++++++-------------- extensions/telegram/src/webhook.ts | 61 +++----- 4 files changed, 110 insertions(+), 151 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 744552c05c6..4afd7431448 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/webhook: acknowledge validated webhook updates before running bot middleware, keeping slow agent turns from tripping Telegram delivery retries while preserving per-chat processing lanes. Fixes #71392. - MCP: retire one-shot embedded bundled MCP runtimes at run end, skip bundle-MCP startup when a runtime tool allowlist cannot reach bundle-MCP tools, and add `mcp.sessionIdleTtlMs` idle eviction for leaked session runtimes. Fixes #71106, #71110, #70389, and #70808. - Gateway/restart continuation: durably hand restart continuations to a session-delivery queue before deleting the restart sentinel, recover queued continuation work after crashy restarts, and fall back to a session-only wake when no channel route survives reboot. (#70780) Thanks @fuller-stack-dev. - Agents/tool-result pruning: harden the tool-result character estimator and context-pruning loops against malformed `{ type: "text" }` blocks created by void or undefined tool handler results, serializing non-string text payloads for size accounting so they cannot bypass trimming as zero-sized. Fixes #34979. (#51267) Thanks @cgdusek. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index 50c0d1bc773..a787e3a82b3 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -704,6 +704,9 @@ curl "https://api.telegram.org/bot/getUpdates" The local listener binds to `127.0.0.1:8787`. For public ingress, either put a reverse proxy in front of the local port or set `webhookHost: "0.0.0.0"` intentionally. + Webhook mode validates request guards, the Telegram secret token, and the JSON body before returning `200` to Telegram. + OpenClaw then processes the update asynchronously through the same per-chat/per-topic bot lanes used by long polling, so slow agent turns do not hold Telegram's delivery ACK. + diff --git a/extensions/telegram/src/webhook.test.ts b/extensions/telegram/src/webhook.test.ts index eeb948e79ab..d89cc01f91a 100644 --- a/extensions/telegram/src/webhook.test.ts +++ b/extensions/telegram/src/webhook.test.ts @@ -5,15 +5,15 @@ import { setTimeout as sleep } from "node:timers/promises"; import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "openclaw/plugin-sdk/webhook-ingress"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -const handlerSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined)); +const handleUpdateSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined)); const setWebhookSpy = vi.hoisted(() => vi.fn()); const deleteWebhookSpy = vi.hoisted(() => vi.fn(async () => true)); const initSpy = vi.hoisted(() => vi.fn(async () => undefined)); const stopSpy = vi.hoisted(() => vi.fn()); -const webhookCallbackSpy = vi.hoisted(() => vi.fn(() => handlerSpy)); const createTelegramBotSpy = vi.hoisted(() => vi.fn(() => ({ init: initSpy, + handleUpdate: handleUpdateSpy, api: { setWebhook: setWebhookSpy, deleteWebhook: deleteWebhookSpy }, stop: stopSpy, })), @@ -90,7 +90,6 @@ vi.mock("grammy", async () => { class GrammyError extends Error { description = ""; }, - webhookCallback: webhookCallbackSpy, }; }); @@ -101,8 +100,8 @@ vi.mock("./bot.js", () => ({ let startTelegramWebhook: typeof import("./webhook.js").startTelegramWebhook; function resetTelegramWebhookMocks(): void { - handlerSpy.mockReset(); - handlerSpy.mockImplementation((..._args: unknown[]): unknown => undefined); + handleUpdateSpy.mockReset(); + handleUpdateSpy.mockImplementation((..._args: unknown[]): unknown => undefined); setWebhookSpy.mockReset(); deleteWebhookSpy.mockReset(); @@ -110,11 +109,10 @@ function resetTelegramWebhookMocks(): void { initSpy.mockReset(); initSpy.mockImplementation(async () => undefined); stopSpy.mockReset(); - webhookCallbackSpy.mockReset(); - webhookCallbackSpy.mockImplementation(() => handlerSpy); createTelegramBotSpy.mockReset(); createTelegramBotSpy.mockImplementation(() => ({ init: initSpy, + handleUpdate: handleUpdateSpy, api: { setWebhook: setWebhookSpy, deleteWebhook: deleteWebhookSpy }, stop: stopSpy, })); @@ -379,20 +377,9 @@ function expectSingleNearLimitUpdate(params: { async function runNearLimitPayloadTest(mode: "single" | "random-chunked"): Promise { const seenUpdates: Array<{ update_id: number; message: { text: string } }> = []; - webhookCallbackSpy.mockImplementationOnce( - () => - vi.fn( - ( - update: unknown, - reply: (json: string) => Promise, - _secretHeader: string | undefined, - _unauthorized: () => Promise, - ) => { - seenUpdates.push(update as { update_id: number; message: { text: string } }); - void reply("ok"); - }, - ) as unknown as typeof handlerSpy, - ); + handleUpdateSpy.mockImplementationOnce((update: unknown) => { + seenUpdates.push(update as { update_id: number; message: { text: string } }); + }); const { payload, sizeBytes } = createNearLimitTelegramPayload(); expect(sizeBytes).toBeLessThan(1_024 * 1_024); @@ -415,7 +402,7 @@ async function runNearLimitPayloadTest(mode: "single" | "random-chunked"): Promi }); expect(response.statusCode).toBe(200); - expectSingleNearLimitUpdate({ seenUpdates, expected }); + await vi.waitFor(() => expectSingleNearLimitUpdate({ seenUpdates, expected })); }, ); } @@ -424,7 +411,6 @@ describe("startTelegramWebhook", () => { it("starts server, registers webhook, and serves health", async () => { initSpy.mockClear(); createTelegramBotSpy.mockClear(); - webhookCallbackSpy.mockClear(); const runtimeLog = vi.fn(); const cfg = { bindings: [] }; await withStartedWebhook( @@ -445,19 +431,6 @@ describe("startTelegramWebhook", () => { expect(health.status).toBe(200); expect(initSpy).toHaveBeenCalledTimes(1); expect(setWebhookSpy).toHaveBeenCalled(); - expect(webhookCallbackSpy).toHaveBeenCalledWith( - expect.objectContaining({ - api: expect.objectContaining({ - setWebhook: expect.any(Function), - }), - }), - "callback", - { - secretToken: TELEGRAM_SECRET, - onTimeout: "return", - timeoutMilliseconds: 5_000, - }, - ); expect(runtimeLog).toHaveBeenCalledWith( expect.stringContaining("webhook local listener on http://127.0.0.1:"), ); @@ -503,7 +476,7 @@ describe("startTelegramWebhook", () => { }); it("invokes webhook handler on matching path", async () => { - handlerSpy.mockClear(); + handleUpdateSpy.mockClear(); createTelegramBotSpy.mockClear(); const cfg = { bindings: [] }; await withStartedWebhook( @@ -527,37 +500,23 @@ describe("startTelegramWebhook", () => { secret: TELEGRAM_SECRET, }); expect(response.status).toBe(200); - expect(handlerSpy).toHaveBeenCalledWith( - JSON.parse(payload), - expect.any(Function), - TELEGRAM_SECRET, - expect.any(Function), - ); + await vi.waitFor(() => expect(handleUpdateSpy).toHaveBeenCalledWith(JSON.parse(payload))); }, ); }); - it("returns after grammY timeout reply while webhook work continues", async () => { + it("acks before webhook update processing finishes", async () => { let finishWork: (() => void) | undefined; + let workStarted = false; let workFinished = false; - webhookCallbackSpy.mockImplementationOnce( - () => - vi.fn( - async ( - update: unknown, - reply: (json: string) => Promise, - _secretHeader: string | undefined, - _unauthorized: () => Promise, - ) => { - expect(update).toEqual({ update_id: 2, message: { text: "slow" } }); - await reply("{}"); - await new Promise((resolve) => { - finishWork = resolve; - }); - workFinished = true; - }, - ) as unknown as typeof handlerSpy, - ); + handleUpdateSpy.mockImplementationOnce(async (update: unknown) => { + expect(update).toEqual({ update_id: 2, message: { text: "slow" } }); + workStarted = true; + await new Promise((resolve) => { + finishWork = resolve; + }); + workFinished = true; + }); await withStartedWebhook( { @@ -565,14 +524,6 @@ describe("startTelegramWebhook", () => { path: TELEGRAM_WEBHOOK_PATH, }, async ({ port }) => { - expect(webhookCallbackSpy).toHaveBeenCalledWith( - expect.anything(), - "callback", - expect.objectContaining({ - onTimeout: "return", - timeoutMilliseconds: 5_000, - }), - ); const response = await postWebhookJson({ url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH), payload: JSON.stringify({ update_id: 2, message: { text: "slow" } }), @@ -581,18 +532,48 @@ describe("startTelegramWebhook", () => { }); expect(response.status).toBe(200); - expect(await response.text()).toBe("{}"); + expect(await response.text()).toBe(""); + await vi.waitFor(() => expect(workStarted).toBe(true)); expect(workFinished).toBe(false); finishWork?.(); - await sleep(WEBHOOK_TEST_YIELD_MS); - expect(workFinished).toBe(true); + await vi.waitFor(() => expect(workFinished).toBe(true)); + }, + ); + }); + + it("logs update processing failures after acknowledging Telegram", async () => { + const runtimeLog = vi.fn(); + handleUpdateSpy.mockRejectedValueOnce(new Error("agent turn failed")); + + await withStartedWebhook( + { + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + runtime: { log: runtimeLog, error: vi.fn(), exit: vi.fn() }, + }, + async ({ port }) => { + const response = await postWebhookJson({ + url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH), + payload: JSON.stringify({ update_id: 3, message: { text: "boom" } }), + secret: TELEGRAM_SECRET, + }); + + expect(response.status).toBe(200); + expect(await response.text()).toBe(""); + await vi.waitFor(() => + expect(runtimeLog).toHaveBeenCalledWith( + expect.stringContaining( + "webhook update processing failed after ack: agent turn failed", + ), + ), + ); }, ); }); it("rejects unauthenticated requests before reading the request body", async () => { - handlerSpy.mockClear(); + handleUpdateSpy.mockClear(); await withStartedWebhook( { secret: TELEGRAM_SECRET, @@ -608,13 +589,13 @@ describe("startTelegramWebhook", () => { expect(response.statusCode).toBe(401); expect(response.body).toBe("unauthorized"); - expect(handlerSpy).not.toHaveBeenCalled(); + expect(handleUpdateSpy).not.toHaveBeenCalled(); }, ); }); it("rate limits repeated invalid secret guesses before authentication succeeds", async () => { - handlerSpy.mockClear(); + handleUpdateSpy.mockClear(); await withStartedWebhook( { secret: TELEGRAM_SECRET, @@ -649,13 +630,13 @@ describe("startTelegramWebhook", () => { }); expect(validResponse.status).toBe(429); expect(await validResponse.text()).toBe("Too Many Requests"); - expect(handlerSpy).not.toHaveBeenCalled(); + expect(handleUpdateSpy).not.toHaveBeenCalled(); }, ); }); it("uses the forwarded client ip when trusted proxies are configured", async () => { - handlerSpy.mockClear(); + handleUpdateSpy.mockClear(); await withStartedWebhook( { secret: TELEGRAM_SECRET, @@ -702,13 +683,13 @@ describe("startTelegramWebhook", () => { ); expect(isolatedClient.status).toBe(200); - expect(handlerSpy).toHaveBeenCalledTimes(1); + await vi.waitFor(() => expect(handleUpdateSpy).toHaveBeenCalledTimes(1)); }, ); }); it("keeps rate-limit state isolated per webhook listener", async () => { - handlerSpy.mockClear(); + handleUpdateSpy.mockClear(); const firstAbort = new AbortController(); const secondAbort = new AbortController(); const first = await startTelegramWebhook({ @@ -748,7 +729,7 @@ describe("startTelegramWebhook", () => { }); expect(secondResponse.status).toBe(200); - expect(handlerSpy).toHaveBeenCalledTimes(1); + await vi.waitFor(() => expect(handleUpdateSpy).toHaveBeenCalledTimes(1)); } finally { firstAbort.abort(); secondAbort.abort(); @@ -788,11 +769,11 @@ describe("startTelegramWebhook", () => { ); }); - it("keeps webhook payload readable when callback delays body read", async () => { - handlerSpy.mockImplementationOnce(async (...args: unknown[]) => { - const [update, reply] = args as [unknown, (json: string) => Promise]; + it("keeps webhook payload readable when update processing is delayed", async () => { + let seenUpdate: unknown; + handleUpdateSpy.mockImplementationOnce(async (update: unknown) => { await sleep(WEBHOOK_TEST_YIELD_MS); - await reply(JSON.stringify(update)); + seenUpdate = update; }); await withStartedWebhook( @@ -808,21 +789,19 @@ describe("startTelegramWebhook", () => { secret: TELEGRAM_SECRET, }); expect(res.status).toBe(200); - const responseBody = await res.text(); - expect(JSON.parse(responseBody)).toEqual(JSON.parse(payload)); + expect(await res.text()).toBe(""); + await vi.waitFor(() => expect(seenUpdate).toEqual(JSON.parse(payload))); }, ); }); it("keeps webhook payload readable across multiple delayed reads", async () => { const seenPayloads: string[] = []; - const delayedHandler = async (...args: unknown[]) => { - const [update, reply] = args as [unknown, (json: string) => Promise]; + const delayedHandler = async (update: unknown) => { await sleep(WEBHOOK_TEST_YIELD_MS); seenPayloads.push(JSON.stringify(update)); - await reply("ok"); }; - handlerSpy.mockImplementationOnce(delayedHandler).mockImplementationOnce(delayedHandler); + handleUpdateSpy.mockImplementationOnce(delayedHandler).mockImplementationOnce(delayedHandler); await withStartedWebhook( { @@ -844,30 +823,21 @@ describe("startTelegramWebhook", () => { expect(res.status).toBe(200); } - expect(seenPayloads.map((x) => JSON.parse(x))).toEqual(payloads.map((x) => JSON.parse(x))); + await vi.waitFor(() => + expect(seenPayloads.map((x) => JSON.parse(x))).toEqual( + payloads.map((x) => JSON.parse(x)), + ), + ); }, ); }); it("processes a second request after first-request delayed-init data loss", async () => { const seenUpdates: unknown[] = []; - webhookCallbackSpy.mockImplementationOnce( - () => - vi.fn( - ( - update: unknown, - reply: (json: string) => Promise, - _secretHeader: string | undefined, - _unauthorized: () => Promise, - ) => { - seenUpdates.push(update); - void (async () => { - await sleep(WEBHOOK_TEST_YIELD_MS); - await reply("ok"); - })(); - }, - ) as unknown as typeof handlerSpy, - ); + handleUpdateSpy.mockImplementation(async (update: unknown) => { + await sleep(WEBHOOK_TEST_YIELD_MS); + seenUpdates.push(update); + }); await withStartedWebhook( { @@ -896,7 +866,9 @@ describe("startTelegramWebhook", () => { expect(firstResponse.statusCode).toBe(200); expect(secondResponse.statusCode).toBe(200); - expect(seenUpdates).toEqual([JSON.parse(firstPayload), JSON.parse(secondPayload)]); + await vi.waitFor(() => + expect(seenUpdates).toEqual([JSON.parse(firstPayload), JSON.parse(secondPayload)]), + ); }, ); }); @@ -910,7 +882,7 @@ describe("startTelegramWebhook", () => { }); it("rejects payloads larger than 1MB before invoking webhook handler", async () => { - handlerSpy.mockClear(); + handleUpdateSpy.mockClear(); await withStartedWebhook( { secret: TELEGRAM_SECRET, @@ -951,7 +923,7 @@ describe("startTelegramWebhook", () => { } else { expect(responseOrError.code).toBeOneOf(["ECONNRESET", "EPIPE"]); } - expect(handlerSpy).not.toHaveBeenCalled(); + expect(handleUpdateSpy).not.toHaveBeenCalled(); }, ); }); diff --git a/extensions/telegram/src/webhook.ts b/extensions/telegram/src/webhook.ts index 295c686add0..ec6e2720071 100644 --- a/extensions/telegram/src/webhook.ts +++ b/extensions/telegram/src/webhook.ts @@ -28,8 +28,6 @@ import { createTelegramBot } from "./bot.js"; const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000; -// Keep below Telegram/proxy read timeouts; grammY returns early while the handler continues. -const TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS = 5_000; const InputFileCtor: typeof grammy.InputFile = typeof grammy.InputFile === "function" ? grammy.InputFile @@ -278,12 +276,6 @@ export async function startTelegramWebhook(opts: { maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, }); - const handler = grammy.webhookCallback(bot, "callback", { - secretToken: secret, - onTimeout: "return", - timeoutMilliseconds: TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS, - }); - if (diagnosticsEnabled) { startDiagnosticHeartbeat(opts.config); } @@ -353,38 +345,29 @@ export async function startTelegramWebhook(opts: { return; } - let replied = false; - const reply = async (json: string) => { - if (replied) { - return; - } - replied = true; - if (res.headersSent || res.writableEnded) { - return; - } - res.writeHead(200, { "Content-Type": "application/json; charset=utf-8" }); - res.end(json); - }; - const unauthorized = async () => { - if (replied) { - return; - } - replied = true; - respondText(401, "unauthorized"); - }; + respondText(200); - await handler(body.value, reply, secretHeader, unauthorized); - if (!replied) { - respondText(200); - } + void (async () => { + await bot.handleUpdate(body.value as Parameters[0]); - if (diagnosticsEnabled) { - logWebhookProcessed({ - channel: "telegram", - updateType: "telegram-post", - durationMs: Date.now() - startTime, - }); - } + if (diagnosticsEnabled) { + logWebhookProcessed({ + channel: "telegram", + updateType: "telegram-post", + durationMs: Date.now() - startTime, + }); + } + })().catch((err) => { + const errMsg = formatErrorMessage(err); + if (diagnosticsEnabled) { + logWebhookError({ + channel: "telegram", + updateType: "telegram-post", + error: errMsg, + }); + } + runtime.log?.(`webhook update processing failed after ack: ${errMsg}`); + }); })().catch((err) => { const errMsg = formatErrorMessage(err); if (diagnosticsEnabled) { @@ -394,7 +377,7 @@ export async function startTelegramWebhook(opts: { error: errMsg, }); } - runtime.log?.(`webhook handler failed: ${errMsg}`); + runtime.log?.(`webhook request failed: ${errMsg}`); respondText(500); }); });