fix: ack Telegram webhooks before update handling

This commit is contained in:
Peter Steinberger
2026-04-25 08:22:40 +01:00
parent a44800e929
commit e8191e5b8f
4 changed files with 110 additions and 151 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Telegram/webhook: acknowledge validated webhook updates before running bot middleware, keeping slow agent turns from tripping Telegram delivery retries while preserving per-chat processing lanes. Fixes #71392.
- MCP: retire one-shot embedded bundled MCP runtimes at run end, skip bundle-MCP startup when a runtime tool allowlist cannot reach bundle-MCP tools, and add `mcp.sessionIdleTtlMs` idle eviction for leaked session runtimes. Fixes #71106, #71110, #70389, and #70808.
- Gateway/restart continuation: durably hand restart continuations to a session-delivery queue before deleting the restart sentinel, recover queued continuation work after crashy restarts, and fall back to a session-only wake when no channel route survives reboot. (#70780) Thanks @fuller-stack-dev.
- Agents/tool-result pruning: harden the tool-result character estimator and context-pruning loops against malformed `{ type: "text" }` blocks created by void or undefined tool handler results, serializing non-string text payloads for size accounting so they cannot bypass trimming as zero-sized. Fixes #34979. (#51267) Thanks @cgdusek.

View File

@@ -704,6 +704,9 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
The local listener binds to `127.0.0.1:8787`. For public ingress, either put a reverse proxy in front of the local port or set `webhookHost: "0.0.0.0"` intentionally.
Webhook mode validates request guards, the Telegram secret token, and the JSON body before returning `200` to Telegram.
OpenClaw then processes the update asynchronously through the same per-chat/per-topic bot lanes used by long polling, so slow agent turns do not hold Telegram's delivery ACK.
</Accordion>
<Accordion title="Limits, retry, and CLI targets">

View File

@@ -5,15 +5,15 @@ import { setTimeout as sleep } from "node:timers/promises";
import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "openclaw/plugin-sdk/webhook-ingress";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const handlerSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined));
const handleUpdateSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined));
const setWebhookSpy = vi.hoisted(() => vi.fn());
const deleteWebhookSpy = vi.hoisted(() => vi.fn(async () => true));
const initSpy = vi.hoisted(() => vi.fn(async () => undefined));
const stopSpy = vi.hoisted(() => vi.fn());
const webhookCallbackSpy = vi.hoisted(() => vi.fn(() => handlerSpy));
const createTelegramBotSpy = vi.hoisted(() =>
vi.fn(() => ({
init: initSpy,
handleUpdate: handleUpdateSpy,
api: { setWebhook: setWebhookSpy, deleteWebhook: deleteWebhookSpy },
stop: stopSpy,
})),
@@ -90,7 +90,6 @@ vi.mock("grammy", async () => {
class GrammyError extends Error {
description = "";
},
webhookCallback: webhookCallbackSpy,
};
});
@@ -101,8 +100,8 @@ vi.mock("./bot.js", () => ({
let startTelegramWebhook: typeof import("./webhook.js").startTelegramWebhook;
function resetTelegramWebhookMocks(): void {
handlerSpy.mockReset();
handlerSpy.mockImplementation((..._args: unknown[]): unknown => undefined);
handleUpdateSpy.mockReset();
handleUpdateSpy.mockImplementation((..._args: unknown[]): unknown => undefined);
setWebhookSpy.mockReset();
deleteWebhookSpy.mockReset();
@@ -110,11 +109,10 @@ function resetTelegramWebhookMocks(): void {
initSpy.mockReset();
initSpy.mockImplementation(async () => undefined);
stopSpy.mockReset();
webhookCallbackSpy.mockReset();
webhookCallbackSpy.mockImplementation(() => handlerSpy);
createTelegramBotSpy.mockReset();
createTelegramBotSpy.mockImplementation(() => ({
init: initSpy,
handleUpdate: handleUpdateSpy,
api: { setWebhook: setWebhookSpy, deleteWebhook: deleteWebhookSpy },
stop: stopSpy,
}));
@@ -379,20 +377,9 @@ function expectSingleNearLimitUpdate(params: {
async function runNearLimitPayloadTest(mode: "single" | "random-chunked"): Promise<void> {
const seenUpdates: Array<{ update_id: number; message: { text: string } }> = [];
webhookCallbackSpy.mockImplementationOnce(
() =>
vi.fn(
(
update: unknown,
reply: (json: string) => Promise<void>,
_secretHeader: string | undefined,
_unauthorized: () => Promise<void>,
) => {
seenUpdates.push(update as { update_id: number; message: { text: string } });
void reply("ok");
},
) as unknown as typeof handlerSpy,
);
handleUpdateSpy.mockImplementationOnce((update: unknown) => {
seenUpdates.push(update as { update_id: number; message: { text: string } });
});
const { payload, sizeBytes } = createNearLimitTelegramPayload();
expect(sizeBytes).toBeLessThan(1_024 * 1_024);
@@ -415,7 +402,7 @@ async function runNearLimitPayloadTest(mode: "single" | "random-chunked"): Promi
});
expect(response.statusCode).toBe(200);
expectSingleNearLimitUpdate({ seenUpdates, expected });
await vi.waitFor(() => expectSingleNearLimitUpdate({ seenUpdates, expected }));
},
);
}
@@ -424,7 +411,6 @@ describe("startTelegramWebhook", () => {
it("starts server, registers webhook, and serves health", async () => {
initSpy.mockClear();
createTelegramBotSpy.mockClear();
webhookCallbackSpy.mockClear();
const runtimeLog = vi.fn();
const cfg = { bindings: [] };
await withStartedWebhook(
@@ -445,19 +431,6 @@ describe("startTelegramWebhook", () => {
expect(health.status).toBe(200);
expect(initSpy).toHaveBeenCalledTimes(1);
expect(setWebhookSpy).toHaveBeenCalled();
expect(webhookCallbackSpy).toHaveBeenCalledWith(
expect.objectContaining({
api: expect.objectContaining({
setWebhook: expect.any(Function),
}),
}),
"callback",
{
secretToken: TELEGRAM_SECRET,
onTimeout: "return",
timeoutMilliseconds: 5_000,
},
);
expect(runtimeLog).toHaveBeenCalledWith(
expect.stringContaining("webhook local listener on http://127.0.0.1:"),
);
@@ -503,7 +476,7 @@ describe("startTelegramWebhook", () => {
});
it("invokes webhook handler on matching path", async () => {
handlerSpy.mockClear();
handleUpdateSpy.mockClear();
createTelegramBotSpy.mockClear();
const cfg = { bindings: [] };
await withStartedWebhook(
@@ -527,37 +500,23 @@ describe("startTelegramWebhook", () => {
secret: TELEGRAM_SECRET,
});
expect(response.status).toBe(200);
expect(handlerSpy).toHaveBeenCalledWith(
JSON.parse(payload),
expect.any(Function),
TELEGRAM_SECRET,
expect.any(Function),
);
await vi.waitFor(() => expect(handleUpdateSpy).toHaveBeenCalledWith(JSON.parse(payload)));
},
);
});
it("returns after grammY timeout reply while webhook work continues", async () => {
it("acks before webhook update processing finishes", async () => {
let finishWork: (() => void) | undefined;
let workStarted = false;
let workFinished = false;
webhookCallbackSpy.mockImplementationOnce(
() =>
vi.fn(
async (
update: unknown,
reply: (json: string) => Promise<void>,
_secretHeader: string | undefined,
_unauthorized: () => Promise<void>,
) => {
expect(update).toEqual({ update_id: 2, message: { text: "slow" } });
await reply("{}");
await new Promise<void>((resolve) => {
finishWork = resolve;
});
workFinished = true;
},
) as unknown as typeof handlerSpy,
);
handleUpdateSpy.mockImplementationOnce(async (update: unknown) => {
expect(update).toEqual({ update_id: 2, message: { text: "slow" } });
workStarted = true;
await new Promise<void>((resolve) => {
finishWork = resolve;
});
workFinished = true;
});
await withStartedWebhook(
{
@@ -565,14 +524,6 @@ describe("startTelegramWebhook", () => {
path: TELEGRAM_WEBHOOK_PATH,
},
async ({ port }) => {
expect(webhookCallbackSpy).toHaveBeenCalledWith(
expect.anything(),
"callback",
expect.objectContaining({
onTimeout: "return",
timeoutMilliseconds: 5_000,
}),
);
const response = await postWebhookJson({
url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH),
payload: JSON.stringify({ update_id: 2, message: { text: "slow" } }),
@@ -581,18 +532,48 @@ describe("startTelegramWebhook", () => {
});
expect(response.status).toBe(200);
expect(await response.text()).toBe("{}");
expect(await response.text()).toBe("");
await vi.waitFor(() => expect(workStarted).toBe(true));
expect(workFinished).toBe(false);
finishWork?.();
await sleep(WEBHOOK_TEST_YIELD_MS);
expect(workFinished).toBe(true);
await vi.waitFor(() => expect(workFinished).toBe(true));
},
);
});
it("logs update processing failures after acknowledging Telegram", async () => {
const runtimeLog = vi.fn();
handleUpdateSpy.mockRejectedValueOnce(new Error("agent turn failed"));
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
runtime: { log: runtimeLog, error: vi.fn(), exit: vi.fn() },
},
async ({ port }) => {
const response = await postWebhookJson({
url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH),
payload: JSON.stringify({ update_id: 3, message: { text: "boom" } }),
secret: TELEGRAM_SECRET,
});
expect(response.status).toBe(200);
expect(await response.text()).toBe("");
await vi.waitFor(() =>
expect(runtimeLog).toHaveBeenCalledWith(
expect.stringContaining(
"webhook update processing failed after ack: agent turn failed",
),
),
);
},
);
});
it("rejects unauthenticated requests before reading the request body", async () => {
handlerSpy.mockClear();
handleUpdateSpy.mockClear();
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
@@ -608,13 +589,13 @@ describe("startTelegramWebhook", () => {
expect(response.statusCode).toBe(401);
expect(response.body).toBe("unauthorized");
expect(handlerSpy).not.toHaveBeenCalled();
expect(handleUpdateSpy).not.toHaveBeenCalled();
},
);
});
it("rate limits repeated invalid secret guesses before authentication succeeds", async () => {
handlerSpy.mockClear();
handleUpdateSpy.mockClear();
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
@@ -649,13 +630,13 @@ describe("startTelegramWebhook", () => {
});
expect(validResponse.status).toBe(429);
expect(await validResponse.text()).toBe("Too Many Requests");
expect(handlerSpy).not.toHaveBeenCalled();
expect(handleUpdateSpy).not.toHaveBeenCalled();
},
);
});
it("uses the forwarded client ip when trusted proxies are configured", async () => {
handlerSpy.mockClear();
handleUpdateSpy.mockClear();
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
@@ -702,13 +683,13 @@ describe("startTelegramWebhook", () => {
);
expect(isolatedClient.status).toBe(200);
expect(handlerSpy).toHaveBeenCalledTimes(1);
await vi.waitFor(() => expect(handleUpdateSpy).toHaveBeenCalledTimes(1));
},
);
});
it("keeps rate-limit state isolated per webhook listener", async () => {
handlerSpy.mockClear();
handleUpdateSpy.mockClear();
const firstAbort = new AbortController();
const secondAbort = new AbortController();
const first = await startTelegramWebhook({
@@ -748,7 +729,7 @@ describe("startTelegramWebhook", () => {
});
expect(secondResponse.status).toBe(200);
expect(handlerSpy).toHaveBeenCalledTimes(1);
await vi.waitFor(() => expect(handleUpdateSpy).toHaveBeenCalledTimes(1));
} finally {
firstAbort.abort();
secondAbort.abort();
@@ -788,11 +769,11 @@ describe("startTelegramWebhook", () => {
);
});
it("keeps webhook payload readable when callback delays body read", async () => {
handlerSpy.mockImplementationOnce(async (...args: unknown[]) => {
const [update, reply] = args as [unknown, (json: string) => Promise<void>];
it("keeps webhook payload readable when update processing is delayed", async () => {
let seenUpdate: unknown;
handleUpdateSpy.mockImplementationOnce(async (update: unknown) => {
await sleep(WEBHOOK_TEST_YIELD_MS);
await reply(JSON.stringify(update));
seenUpdate = update;
});
await withStartedWebhook(
@@ -808,21 +789,19 @@ describe("startTelegramWebhook", () => {
secret: TELEGRAM_SECRET,
});
expect(res.status).toBe(200);
const responseBody = await res.text();
expect(JSON.parse(responseBody)).toEqual(JSON.parse(payload));
expect(await res.text()).toBe("");
await vi.waitFor(() => expect(seenUpdate).toEqual(JSON.parse(payload)));
},
);
});
it("keeps webhook payload readable across multiple delayed reads", async () => {
const seenPayloads: string[] = [];
const delayedHandler = async (...args: unknown[]) => {
const [update, reply] = args as [unknown, (json: string) => Promise<void>];
const delayedHandler = async (update: unknown) => {
await sleep(WEBHOOK_TEST_YIELD_MS);
seenPayloads.push(JSON.stringify(update));
await reply("ok");
};
handlerSpy.mockImplementationOnce(delayedHandler).mockImplementationOnce(delayedHandler);
handleUpdateSpy.mockImplementationOnce(delayedHandler).mockImplementationOnce(delayedHandler);
await withStartedWebhook(
{
@@ -844,30 +823,21 @@ describe("startTelegramWebhook", () => {
expect(res.status).toBe(200);
}
expect(seenPayloads.map((x) => JSON.parse(x))).toEqual(payloads.map((x) => JSON.parse(x)));
await vi.waitFor(() =>
expect(seenPayloads.map((x) => JSON.parse(x))).toEqual(
payloads.map((x) => JSON.parse(x)),
),
);
},
);
});
it("processes a second request after first-request delayed-init data loss", async () => {
const seenUpdates: unknown[] = [];
webhookCallbackSpy.mockImplementationOnce(
() =>
vi.fn(
(
update: unknown,
reply: (json: string) => Promise<void>,
_secretHeader: string | undefined,
_unauthorized: () => Promise<void>,
) => {
seenUpdates.push(update);
void (async () => {
await sleep(WEBHOOK_TEST_YIELD_MS);
await reply("ok");
})();
},
) as unknown as typeof handlerSpy,
);
handleUpdateSpy.mockImplementation(async (update: unknown) => {
await sleep(WEBHOOK_TEST_YIELD_MS);
seenUpdates.push(update);
});
await withStartedWebhook(
{
@@ -896,7 +866,9 @@ describe("startTelegramWebhook", () => {
expect(firstResponse.statusCode).toBe(200);
expect(secondResponse.statusCode).toBe(200);
expect(seenUpdates).toEqual([JSON.parse(firstPayload), JSON.parse(secondPayload)]);
await vi.waitFor(() =>
expect(seenUpdates).toEqual([JSON.parse(firstPayload), JSON.parse(secondPayload)]),
);
},
);
});
@@ -910,7 +882,7 @@ describe("startTelegramWebhook", () => {
});
it("rejects payloads larger than 1MB before invoking webhook handler", async () => {
handlerSpy.mockClear();
handleUpdateSpy.mockClear();
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
@@ -951,7 +923,7 @@ describe("startTelegramWebhook", () => {
} else {
expect(responseOrError.code).toBeOneOf(["ECONNRESET", "EPIPE"]);
}
expect(handlerSpy).not.toHaveBeenCalled();
expect(handleUpdateSpy).not.toHaveBeenCalled();
},
);
});

View File

@@ -28,8 +28,6 @@ import { createTelegramBot } from "./bot.js";
const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
// Keep below Telegram/proxy read timeouts; grammY returns early while the handler continues.
const TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS = 5_000;
const InputFileCtor: typeof grammy.InputFile =
typeof grammy.InputFile === "function"
? grammy.InputFile
@@ -278,12 +276,6 @@ export async function startTelegramWebhook(opts: {
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const handler = grammy.webhookCallback(bot, "callback", {
secretToken: secret,
onTimeout: "return",
timeoutMilliseconds: TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS,
});
if (diagnosticsEnabled) {
startDiagnosticHeartbeat(opts.config);
}
@@ -353,38 +345,29 @@ export async function startTelegramWebhook(opts: {
return;
}
let replied = false;
const reply = async (json: string) => {
if (replied) {
return;
}
replied = true;
if (res.headersSent || res.writableEnded) {
return;
}
res.writeHead(200, { "Content-Type": "application/json; charset=utf-8" });
res.end(json);
};
const unauthorized = async () => {
if (replied) {
return;
}
replied = true;
respondText(401, "unauthorized");
};
respondText(200);
await handler(body.value, reply, secretHeader, unauthorized);
if (!replied) {
respondText(200);
}
void (async () => {
await bot.handleUpdate(body.value as Parameters<typeof bot.handleUpdate>[0]);
if (diagnosticsEnabled) {
logWebhookProcessed({
channel: "telegram",
updateType: "telegram-post",
durationMs: Date.now() - startTime,
});
}
if (diagnosticsEnabled) {
logWebhookProcessed({
channel: "telegram",
updateType: "telegram-post",
durationMs: Date.now() - startTime,
});
}
})().catch((err) => {
const errMsg = formatErrorMessage(err);
if (diagnosticsEnabled) {
logWebhookError({
channel: "telegram",
updateType: "telegram-post",
error: errMsg,
});
}
runtime.log?.(`webhook update processing failed after ack: ${errMsg}`);
});
})().catch((err) => {
const errMsg = formatErrorMessage(err);
if (diagnosticsEnabled) {
@@ -394,7 +377,7 @@ export async function startTelegramWebhook(opts: {
error: errMsg,
});
}
runtime.log?.(`webhook handler failed: ${errMsg}`);
runtime.log?.(`webhook request failed: ${errMsg}`);
respondText(500);
});
});