Fix Telegram polling ingress under event-loop stalls (#81746)

* fix telegram polling ingress under event-loop stalls

* add changelog for telegram ingress fix
This commit is contained in:
Josh Avant
2026-05-14 03:35:06 -05:00
committed by GitHub
parent c35634c729
commit fd244fd76d
13 changed files with 791 additions and 51 deletions

View File

@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
- Memory/daily-files: widen the daily-memory file matcher used by Dreaming, rem-backfill, rem-harness, the doctor sweep, and short-term promotion so `memory/YYYY-MM-DD-<slug>.md` files written by the bundled session-memory hook (and any future slugged variants) are discovered alongside the date-only `memory/YYYY-MM-DD.md` shape. Date extraction still uses the leading `YYYY-MM-DD` capture group, so per-day ingestion/promotion semantics are unchanged for existing date-only files; slugged files now flow through the same paths instead of being silently skipped. Fixes #69536. Thanks @jack-stormentswe.
- Security/sandbox: include Windows `USERPROFILE` in the sandbox blocked home roots so credential-bearing binds (such as `.codex`, `.openclaw`, or `.ssh` under the Windows user profile) are denied even when `HOME` points at a different shell home. (#63074) Thanks @luoyanglang.
- Gateway/OpenAI-compatible HTTP: parse shared JSON endpoint paths without trusting malformed Host headers, avoiding 500s before `/v1/chat/completions`, `/v1/responses`, and `/v1/embeddings` request handling.
- Telegram: keep Bot API polling alive during main event-loop stalls by moving ingress to an isolated worker with a durable local spool. Fixes #81132. (#81746) Thanks @joshavant.
- Voice-call webhooks: parse webhook and realtime upgrade paths without trusting malformed Host headers, avoiding 500s before provider signature checks or path rejection.
- Media store: reject malformed redirect `Location` headers as media-download failures instead of letting URL parsing escape the async response callback.
- ClickClack: skip malformed realtime websocket frames instead of stopping the channel monitor on a single bad JSON event.

View File

@@ -115,6 +115,11 @@ type RunnerStub = {
isRunning: () => boolean;
};
const withLegacyPolling = (opts: MonitorTelegramOpts): MonitorTelegramOpts => ({
...opts,
isolatedIngress: { enabled: false, ...opts.isolatedIngress },
});
const makeRunnerStub = (overrides: Partial<RunnerStub> = {}): RunnerStub => ({
task: overrides.task ?? (() => Promise.resolve()),
stop: overrides.stop ?? vi.fn<() => void | Promise<void>>(),
@@ -197,7 +202,7 @@ async function expectOffsetConfirmationSkipped(offset: number | null) {
api.deleteWebhook.mockResolvedValueOnce(true);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expect(api.getUpdates).not.toHaveBeenCalled();
}
@@ -225,7 +230,7 @@ async function runMonitorAndCaptureStartupOrder(params?: { persistedOffset?: num
return makeAbortRunner(abort);
});
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
return { order };
}
@@ -305,6 +310,7 @@ async function monitorWithAutoAbort(opts: Omit<MonitorTelegramOpts, "abortSignal
token: "tok",
...opts,
abortSignal: abort.signal,
isolatedIngress: { enabled: false, ...opts.isolatedIngress },
});
}
@@ -496,7 +502,7 @@ describe("monitorTelegramProvider (grammY)", () => {
)
.mockImplementationOnce(() => makeAbortRunner(abort));
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expectRecoverableRetryState(2);
});
@@ -516,7 +522,7 @@ describe("monitorTelegramProvider (grammY)", () => {
api.deleteWebhook.mockRejectedValueOnce(cleanupError);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expect(api.deleteWebhook).toHaveBeenCalledTimes(1);
expect(api.getWebhookInfo).not.toHaveBeenCalled();
@@ -531,7 +537,7 @@ describe("monitorTelegramProvider (grammY)", () => {
api.deleteWebhook.mockRejectedValueOnce(cleanupError);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expect(api.deleteWebhook).toHaveBeenCalledTimes(1);
expect(api.getWebhookInfo).not.toHaveBeenCalled();
@@ -545,7 +551,7 @@ describe("monitorTelegramProvider (grammY)", () => {
createTelegramBotErrors.push(setupError);
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expectRecoverableRetryState(1);
});
@@ -571,7 +577,7 @@ describe("monitorTelegramProvider (grammY)", () => {
return makeAbortRunner(abort);
});
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expect(firstStop).toHaveBeenCalled();
expectRecoverableRetryState(2);
@@ -581,7 +587,7 @@ describe("monitorTelegramProvider (grammY)", () => {
const abort = new AbortController();
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expect(createdBotStops.length).toBe(1);
expect(createdBotStops[0]).toHaveBeenCalledTimes(1);
@@ -591,10 +597,12 @@ describe("monitorTelegramProvider (grammY)", () => {
const abort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow(
await expect(monitorTelegramProvider(withLegacyPolling({ token: "tok" }))).rejects.toThrow(
"refusing duplicate poller",
);
expect(runSpy).toHaveBeenCalledTimes(1);
@@ -609,15 +617,19 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const firstMonitor = monitorTelegramProvider({
token: "tok-a",
abortSignal: firstAbort.signal,
});
const firstMonitor = monitorTelegramProvider(
withLegacyPolling({
token: "tok-a",
abortSignal: firstAbort.signal,
}),
);
await firstCycle.waitForRunStart();
const secondMonitor = monitorTelegramProvider({
token: "tok-b",
abortSignal: secondAbort.signal,
});
const secondMonitor = monitorTelegramProvider(
withLegacyPolling({
token: "tok-b",
abortSignal: secondAbort.signal,
}),
);
await secondCycle.waitForRunStart();
expect(runSpy).toHaveBeenCalledTimes(2);
@@ -632,18 +644,22 @@ describe("monitorTelegramProvider (grammY)", () => {
const secondAbort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const firstMonitor = monitorTelegramProvider({
token: "tok",
abortSignal: firstAbort.signal,
});
const firstMonitor = monitorTelegramProvider(
withLegacyPolling({
token: "tok",
abortSignal: firstAbort.signal,
}),
);
await firstCycle.waitForRunStart();
firstAbort.abort();
const secondCycle = mockRunOnceAndAbort(secondAbort);
const secondMonitor = monitorTelegramProvider({
token: "tok",
abortSignal: secondAbort.signal,
});
const secondMonitor = monitorTelegramProvider(
withLegacyPolling({
token: "tok",
abortSignal: secondAbort.signal,
}),
);
await secondCycle.waitForRunStart();
await Promise.all([firstMonitor, secondMonitor]);
@@ -656,7 +672,7 @@ describe("monitorTelegramProvider (grammY)", () => {
const abort = new AbortController();
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
expect(vi.getTimerCount()).toBe(0);
} finally {
@@ -671,7 +687,9 @@ describe("monitorTelegramProvider (grammY)", () => {
}),
);
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token");
await expect(monitorTelegramProvider(withLegacyPolling({ token: "tok" }))).rejects.toThrow(
"bad token",
);
});
it("force-restarts polling when unhandled network rejection stalls runner", async () => {
@@ -679,7 +697,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
expect(emitUnhandledRejection(await makeTaggedPollingFetchError())).toBe(true);
@@ -697,7 +717,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
expect(emitUncaughtException(await makeTaggedPollingFetchError())).toBe(true);
@@ -713,7 +735,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
expect(emitUncaughtException(await makeTaggedPollingHttpError())).toBe(true);
@@ -745,7 +769,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceAndAbort(abort);
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
vi.advanceTimersByTime(150_000);
@@ -779,7 +805,9 @@ describe("monitorTelegramProvider (grammY)", () => {
.mockReturnValueOnce(rebuiltTransport);
const secondCycle = mockRunOnceAndAbort(abort);
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
expect(emitUnhandledRejection(await makeTaggedPollingFetchError())).toBe(true);
@@ -798,7 +826,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const { stop, waitForTaskStart } = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceAndAbort(abort);
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await waitForTaskStart();
const firstSignal = createTelegramBotCalls[0]?.fetchAbortSignal;
expect(firstSignal).toBeInstanceOf(AbortSignal);
@@ -817,7 +847,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const { stop } = firstCycle;
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
const slackDnsError = Object.assign(
@@ -891,7 +923,9 @@ describe("monitorTelegramProvider (grammY)", () => {
const { stop } = firstCycle;
const secondCycle = mockRunOnceAndAbort(abort);
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
const monitor = monitorTelegramProvider(
withLegacyPolling({ token: "tok", abortSignal: abort.signal }),
);
await firstCycle.waitForRunStart();
// Advance time past the stall threshold (120s) + watchdog interval (30s)
@@ -910,14 +944,16 @@ describe("monitorTelegramProvider (grammY)", () => {
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceAndAbort(abort);
const monitor = monitorTelegramProvider({
token: "tok",
abortSignal: abort.signal,
config: {
agents: { defaults: { maxConcurrent: 2 } },
channels: { telegram: { pollingStallThresholdMs: 30_000 } },
},
});
const monitor = monitorTelegramProvider(
withLegacyPolling({
token: "tok",
abortSignal: abort.signal,
config: {
agents: { defaults: { maxConcurrent: 2 } },
channels: { telegram: { pollingStallThresholdMs: 30_000 } },
},
}),
);
await firstCycle.waitForRunStart();
vi.advanceTimersByTime(60_000);
@@ -996,7 +1032,7 @@ describe("monitorTelegramProvider (grammY)", () => {
return makeAbortRunner(abort);
});
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal }));
// deleteWebhook should be called twice: once on initial cleanup, once after 409 reset
expect(api.deleteWebhook).toHaveBeenCalledTimes(2);

View File

@@ -291,6 +291,13 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
createTelegramTransport: createTelegramTransportForPolling,
stallThresholdMs: account.config.pollingStallThresholdMs,
setStatus: opts.setStatus,
isolatedIngress: {
enabled: opts.isolatedIngress?.enabled ?? true,
apiRoot: account.config.apiRoot,
timeoutSeconds: account.config.timeoutSeconds,
proxy: account.config.proxy,
network: account.config.network,
},
});
await pollingSession.runUntilAbort();
} finally {

View File

@@ -23,6 +23,9 @@ export type MonitorTelegramOpts = {
webhookCertPath?: string;
botInfo?: TelegramBotInfo;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
isolatedIngress?: {
enabled?: boolean;
};
};
export type TelegramMonitorFn = (opts?: MonitorTelegramOpts) => Promise<void>;

View File

@@ -1,3 +1,6 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
@@ -30,6 +33,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", () => ({
}));
let TelegramPollingSession: typeof import("./polling-session.js").TelegramPollingSession;
let writeTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").writeTelegramSpooledUpdate;
type TelegramApiMiddleware = (
prev: (...args: unknown[]) => Promise<unknown>,
@@ -192,6 +196,7 @@ function createPollingSession(params: {
createTelegramTransport?: () => ReturnType<typeof makeTelegramTransport>;
stallThresholdMs?: number;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
isolatedIngress?: ConstructorParameters<typeof TelegramPollingSession>[0]["isolatedIngress"];
}) {
return new TelegramPollingSession({
token: "tok",
@@ -207,6 +212,7 @@ function createPollingSession(params: {
telegramTransport: params.telegramTransport,
stallThresholdMs: params.stallThresholdMs,
setStatus: params.setStatus,
isolatedIngress: params.isolatedIngress,
...(params.createTelegramTransport
? { createTelegramTransport: params.createTelegramTransport }
: {}),
@@ -262,6 +268,7 @@ async function waitForApiMiddleware(
describe("TelegramPollingSession", () => {
beforeAll(async () => {
({ TelegramPollingSession } = await import("./polling-session.js"));
({ writeTelegramSpooledUpdate } = await import("./telegram-ingress-spool.js"));
});
beforeEach(() => {
@@ -365,6 +372,70 @@ describe("TelegramPollingSession", () => {
expect(bot.api.getUpdates).not.toHaveBeenCalled();
});
it("drains isolated ingress spool through the main-thread bot without offset watermark skipping", async () => {
const abort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
const handleUpdate = vi.fn(async () => undefined);
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
handleUpdate,
stop: vi.fn(async () => undefined),
};
createTelegramBotMock.mockReturnValueOnce(bot);
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: { update_id: 42, message: { text: "hello" } },
});
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
const createWorker = vi.fn(() => ({
onMessage: vi.fn(() => () => undefined),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
await workerDone;
}),
}));
try {
const session = createPollingSession({
abortSignal: abort.signal,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: 10,
},
});
const runPromise = session.runUntilAbort();
await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
abort.abort();
await runPromise;
expect(createWorker).toHaveBeenCalledWith(
expect.objectContaining({
initialUpdateId: null,
spoolDir: tempDir,
token: "tok",
}),
);
expect(
mockObjectArg(createTelegramBotMock, "createTelegramBot").updateOffset,
).toBeUndefined();
expect(handleUpdate).toHaveBeenCalledWith({ update_id: 42, message: { text: "hello" } });
await expect(fs.readdir(tempDir)).resolves.toEqual([]);
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("forces a restart when polling stalls without getUpdates activity", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);

View File

@@ -1,5 +1,6 @@
import { type RunOptions, run } from "@grammyjs/runner";
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import type { TelegramNetworkConfig } from "openclaw/plugin-sdk/config-contracts";
import {
computeBackoff,
formatDurationPrecise,
@@ -15,6 +16,15 @@ import { TelegramPollingLivenessTracker } from "./polling-liveness.js";
import { createTelegramPollingStatusPublisher } from "./polling-status.js";
import { TelegramPollingTransportState } from "./polling-transport-state.js";
import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js";
import {
deleteTelegramSpooledUpdate,
listTelegramSpooledUpdates,
resolveTelegramIngressSpoolDir,
} from "./telegram-ingress-spool.js";
import {
createTelegramIngressWorker,
type TelegramIngressWorkerFactory,
} from "./telegram-ingress-worker.js";
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs: 2000,
@@ -80,6 +90,16 @@ type TelegramPollingSessionOpts = {
/** Stall detection threshold in ms. Defaults to 120_000 (2 min). */
stallThresholdMs?: number;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
isolatedIngress?: {
enabled: boolean;
apiRoot?: string;
timeoutSeconds?: number;
proxy?: string;
network?: TelegramNetworkConfig;
spoolDir?: string;
createWorker?: TelegramIngressWorkerFactory;
drainIntervalMs?: number;
};
};
export class TelegramPollingSession {
@@ -135,7 +155,9 @@ export class TelegramPollingSession {
return;
}
const state = await this.#runPollingCycle(bot);
const state = this.opts.isolatedIngress?.enabled
? await this.#runIsolatedIngressCycle(bot)
: await this.#runPollingCycle(bot);
if (state === "exit") {
return;
}
@@ -181,6 +203,12 @@ export class TelegramPollingSession {
const fetchAbortController = new AbortController();
this.#activeFetchAbort = fetchAbortController;
const telegramTransport = this.#transportState.acquireForNextCycle();
const updateOffset = this.opts.isolatedIngress?.enabled
? undefined
: {
lastUpdateId: this.opts.getLastUpdateId(),
onUpdateId: this.opts.persistUpdateId,
};
try {
return createTelegramBot({
token: this.opts.token,
@@ -191,10 +219,7 @@ export class TelegramPollingSession {
botInfo: this.opts.botInfo,
fetchAbortSignal: fetchAbortController.signal,
minimumClientTimeoutSeconds: TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS,
updateOffset: {
lastUpdateId: this.opts.getLastUpdateId(),
onUpdateId: this.opts.persistUpdateId,
},
...(updateOffset ? { updateOffset } : {}),
telegramTransport,
});
} catch (err) {
@@ -233,6 +258,135 @@ export class TelegramPollingSession {
}
}
async #drainSpooledUpdates(params: { bot: TelegramBot; spoolDir: string }): Promise<number> {
let handled = 0;
const updates = await listTelegramSpooledUpdates({ spoolDir: params.spoolDir, limit: 100 });
for (const update of updates) {
if (this.opts.abortSignal?.aborted) {
break;
}
try {
await params.bot.handleUpdate(
update.update as Parameters<typeof params.bot.handleUpdate>[0],
);
await deleteTelegramSpooledUpdate(update);
handled += 1;
} catch (err) {
this.opts.log(
`[telegram][diag] spooled update ${update.updateId} handler failed; keeping for retry: ${formatErrorMessage(err)}`,
);
break;
}
}
return handled;
}
async #runIsolatedIngressCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
const ingress = this.opts.isolatedIngress;
if (!ingress?.enabled) {
return this.#runPollingCycle(bot);
}
const spoolDir =
ingress.spoolDir ?? resolveTelegramIngressSpoolDir({ accountId: this.opts.accountId });
const workerFactory = ingress.createWorker ?? createTelegramIngressWorker;
const worker = workerFactory({
token: this.opts.token,
accountId: this.opts.accountId,
initialUpdateId: this.opts.getLastUpdateId(),
spoolDir,
apiRoot: ingress.apiRoot,
timeoutSeconds: ingress.timeoutSeconds,
network: ingress.network,
proxy: ingress.proxy,
});
this.opts.log(`[telegram][diag] isolated polling ingress started spool=${spoolDir}`);
const pollState: {
startedAt: number | null;
offset: number | null;
outcome: string;
error?: string;
} = {
startedAt: null,
offset: null,
outcome: "not-started",
};
let consecutiveDrainFailures = 0;
const unsubscribe = worker.onMessage((message) => {
if (message.type === "poll-start") {
pollState.startedAt = message.startedAt;
pollState.offset = message.offset;
pollState.outcome = "started";
delete pollState.error;
return;
}
if (message.type === "poll-success") {
this.#status.notePollSuccess(message.finishedAt);
pollState.outcome = `ok:${message.count}`;
return;
}
if (message.type === "poll-error") {
pollState.outcome = "error";
pollState.error = message.message;
}
});
const stopOnAbort = () => {
void worker.stop();
};
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500));
let drainActive = false;
const drainOnce = async () => {
if (drainActive || this.opts.abortSignal?.aborted) {
return;
}
drainActive = true;
try {
await this.#drainSpooledUpdates({ bot, spoolDir });
consecutiveDrainFailures = 0;
} catch (err) {
consecutiveDrainFailures += 1;
this.opts.log(
`[telegram][diag] isolated polling spool drain failed (${consecutiveDrainFailures}): ${formatErrorMessage(err)}`,
);
} finally {
drainActive = false;
}
};
await drainOnce();
const drainTimer = setInterval(() => {
void drainOnce();
}, drainIntervalMs);
drainTimer.unref?.();
const stopBot = () => {
return Promise.resolve(bot.stop())
.then(() => undefined)
.catch(() => {
// Bot may already be stopped by shutdown paths.
});
};
try {
await worker.task();
if (this.opts.abortSignal?.aborted) {
return "exit";
}
const errorText = pollState.error ? ` error=${pollState.error}` : "";
this.opts.log(
`[telegram][diag] isolated polling ingress stopped outcome=${pollState.outcome} startedAt=${pollState.startedAt ?? "n/a"} offset=${pollState.offset ?? "n/a"}${errorText}`,
);
const shouldRestart = await this.#waitBeforeRestart(
(delay) => `Telegram isolated polling ingress stopped; restarting in ${delay}.`,
);
return shouldRestart ? "continue" : "exit";
} finally {
clearInterval(drainTimer);
unsubscribe();
this.opts.abortSignal?.removeEventListener("abort", stopOnAbort);
await worker.stop();
await drainOnce();
await waitForGracefulStop(stopBot);
}
}
async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
const liveness = new TelegramPollingLivenessTracker({
onPollSuccess: (finishedAt) => this.#status.notePollSuccess(finishedAt),

View File

@@ -0,0 +1,44 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
deleteTelegramSpooledUpdate,
listTelegramSpooledUpdates,
writeTelegramSpooledUpdate,
} from "./telegram-ingress-spool.js";
describe("Telegram ingress spool", () => {
it("persists updates durably in update_id order and deletes handled entries", async () => {
const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
try {
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 11, message: { text: "second" } },
now: 2,
});
await writeTelegramSpooledUpdate({
spoolDir,
update: { update_id: 10, message: { text: "first" } },
now: 1,
});
const updates = await listTelegramSpooledUpdates({ spoolDir });
expect(updates.map((update) => update.updateId)).toEqual([10, 11]);
expect(updates.map((update) => update.receivedAt)).toEqual([1, 2]);
expect(updates[0]?.update).toEqual({ update_id: 10, message: { text: "first" } });
if (!updates[0]) {
throw new Error("Expected a spooled update");
}
await deleteTelegramSpooledUpdate(updates[0]);
expect(
(await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId),
).toEqual([11]);
} finally {
await fs.rm(spoolDir, { recursive: true, force: true });
}
});
});

View File

@@ -0,0 +1,133 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
const SPOOL_VERSION = 1;
type TelegramSpooledUpdatePayload = {
version: number;
updateId: number;
receivedAt: number;
update: unknown;
};
export type TelegramSpooledUpdate = {
updateId: number;
path: string;
update: unknown;
receivedAt: number;
};
function normalizeAccountId(accountId?: string) {
const trimmed = accountId?.trim();
if (!trimmed) {
return "default";
}
return trimmed.replace(/[^a-z0-9._-]+/gi, "_");
}
function isValidUpdateId(value: unknown): value is number {
return typeof value === "number" && Number.isSafeInteger(value) && value >= 0;
}
export function resolveTelegramIngressSpoolDir(params: {
accountId?: string;
env?: NodeJS.ProcessEnv;
}): string {
const stateDir = resolveStateDir(params.env, os.homedir);
return path.join(stateDir, "telegram", `ingress-spool-${normalizeAccountId(params.accountId)}`);
}
export function resolveTelegramUpdateId(update: unknown): number | null {
if (!update || typeof update !== "object") {
return null;
}
const value = (update as { update_id?: unknown }).update_id;
return isValidUpdateId(value) ? value : null;
}
function spoolFileName(updateId: number): string {
return `${String(updateId).padStart(16, "0")}.json`;
}
function parseSpooledUpdate(value: unknown, filePath: string): TelegramSpooledUpdate | null {
if (!value || typeof value !== "object") {
return null;
}
const payload = value as Partial<TelegramSpooledUpdatePayload>;
if (payload.version !== SPOOL_VERSION || !isValidUpdateId(payload.updateId)) {
return null;
}
return {
updateId: payload.updateId,
path: filePath,
update: payload.update,
receivedAt: typeof payload.receivedAt === "number" ? payload.receivedAt : 0,
};
}
export async function writeTelegramSpooledUpdate(params: {
spoolDir: string;
update: unknown;
now?: number;
}): Promise<number> {
const updateId = resolveTelegramUpdateId(params.update);
if (updateId === null) {
throw new Error("Telegram update missing numeric update_id.");
}
await fs.mkdir(params.spoolDir, { recursive: true });
const targetPath = path.join(params.spoolDir, spoolFileName(updateId));
const tempPath = path.join(params.spoolDir, `${spoolFileName(updateId)}.${randomUUID()}.tmp`);
const payload: TelegramSpooledUpdatePayload = {
version: SPOOL_VERSION,
updateId,
receivedAt: params.now ?? Date.now(),
update: params.update,
};
await fs.writeFile(tempPath, `${JSON.stringify(payload)}\n`, { mode: 0o600 });
await fs.rename(tempPath, targetPath);
return updateId;
}
export async function listTelegramSpooledUpdates(params: {
spoolDir: string;
limit?: number;
}): Promise<TelegramSpooledUpdate[]> {
let entries: string[];
try {
entries = await fs.readdir(params.spoolDir);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return [];
}
throw err;
}
const files = entries
.filter((entry) => entry.endsWith(".json"))
.toSorted()
.slice(0, Math.max(1, params.limit ?? 100));
const updates: TelegramSpooledUpdate[] = [];
for (const file of files) {
const filePath = path.join(params.spoolDir, file);
const { value } = await readJsonFileWithFallback<unknown>(filePath, null);
const parsed = parseSpooledUpdate(value, filePath);
if (parsed) {
updates.push(parsed);
}
}
return updates;
}
export async function deleteTelegramSpooledUpdate(update: TelegramSpooledUpdate): Promise<void> {
try {
await fs.unlink(update.path);
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return;
}
throw err;
}
}

View File

@@ -0,0 +1,175 @@
import { parentPort, workerData } from "node:worker_threads";
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { normalizeTelegramApiRoot } from "./api-root.js";
import { resolveTelegramTransport } from "./fetch.js";
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
import { makeProxyFetch } from "./proxy.js";
import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js";
import { writeTelegramSpooledUpdate } from "./telegram-ingress-spool.js";
import type {
TelegramIngressWorkerMessage,
TelegramIngressWorkerOptions,
} from "./telegram-ingress-worker.js";
import { writeTelegramUpdateOffset } from "./update-offset-store.js";
const options = workerData as TelegramIngressWorkerOptions;
const pollLimit = 100;
const retryInitialMs = 1000;
const retryMaxMs = 30_000;
let stopped = false;
let activeController: AbortController | undefined;
function post(message: TelegramIngressWorkerMessage): void {
// oxlint-disable-next-line unicorn/require-post-message-target-origin -- Node worker_threads ports do not accept a targetOrigin argument.
parentPort?.postMessage(message);
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function formatErrorMessage(err: unknown): string {
if (err instanceof Error) {
return err.message || err.name;
}
return String(err);
}
function resolveBackoff(attempt: number): number {
return Math.min(retryMaxMs, retryInitialMs * 2 ** Math.max(0, attempt - 1));
}
parentPort?.on("message", (message: { type?: string }) => {
if (message?.type !== "stop") {
return;
}
stopped = true;
activeController?.abort(new Error("telegram ingress worker stopped"));
});
async function fetchJson(params: {
fetch: typeof fetch;
url: string;
body: unknown;
}): Promise<unknown> {
const controller = new AbortController();
activeController = controller;
const timeout = setTimeout(() => {
controller.abort(new Error("Telegram getUpdates timed out"));
}, TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS);
timeout.unref?.();
try {
const response = await params.fetch(params.url, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(params.body),
signal: controller.signal,
});
const json = (await response.json()) as {
ok?: unknown;
result?: unknown;
description?: unknown;
};
if (!response.ok || json.ok !== true) {
throw new Error(
typeof json.description === "string"
? json.description
: `Telegram getUpdates failed with HTTP ${response.status}`,
);
}
return json.result;
} finally {
clearTimeout(timeout);
if (activeController === controller) {
activeController = undefined;
}
}
}
async function main(): Promise<void> {
const proxyFetch = options.proxy ? makeProxyFetch(options.proxy) : undefined;
const transport = resolveTelegramTransport(proxyFetch, { network: options.network });
const fetchImpl = transport.fetch ?? globalThis.fetch;
const apiRoot = normalizeTelegramApiRoot(options.apiRoot ?? "https://api.telegram.org");
const getUpdatesUrl = `${apiRoot}/bot${options.token}/getUpdates`;
const pollTimeoutSeconds =
typeof options.timeoutSeconds === "number" && Number.isFinite(options.timeoutSeconds)
? Math.max(1, Math.floor(options.timeoutSeconds))
: 30;
let lastUpdateId = options.initialUpdateId;
let failures = 0;
try {
for (;;) {
if (stopped) {
break;
}
const offset = lastUpdateId === null ? null : lastUpdateId + 1;
const startedAt = Date.now();
post({ type: "poll-start", offset, startedAt });
try {
const result = await fetchJson({
fetch: fetchImpl,
url: getUpdatesUrl,
body: {
timeout: pollTimeoutSeconds,
limit: pollLimit,
allowed_updates: resolveTelegramAllowedUpdates(),
...(offset === null ? {} : { offset }),
},
});
if (!Array.isArray(result)) {
throw new Error("Telegram getUpdates returned a non-array result.");
}
for (const update of result) {
if (stopped) {
break;
}
const updateId = await writeTelegramSpooledUpdate({
spoolDir: options.spoolDir,
update,
});
if (lastUpdateId === null || updateId > lastUpdateId) {
lastUpdateId = updateId;
await writeTelegramUpdateOffset({
accountId: options.accountId,
botToken: options.token,
updateId,
});
}
post({ type: "spooled", updateId, queued: result.length });
}
failures = 0;
post({
type: "poll-success",
offset,
count: result.length,
finishedAt: Date.now(),
});
} catch (err) {
if (stopped) {
break;
}
failures += 1;
post({
type: "poll-error",
message: formatErrorMessage(err),
finishedAt: Date.now(),
});
if (!isRecoverableTelegramNetworkError(err, { context: "polling" })) {
throw err;
}
await sleep(resolveBackoff(failures));
}
}
} finally {
await transport.close();
}
}
main()
.then(() => undefined)
.catch((err) => {
post({ type: "poll-error", message: formatErrorMessage(err), finishedAt: Date.now() });
process.exitCode = 1;
});

View File

@@ -0,0 +1,93 @@
import { Worker } from "node:worker_threads";
import type { TelegramNetworkConfig } from "openclaw/plugin-sdk/config-contracts";
export type TelegramIngressWorkerMessage =
| {
type: "poll-start";
offset: number | null;
startedAt: number;
}
| {
type: "poll-success";
offset: number | null;
count: number;
finishedAt: number;
}
| {
type: "poll-error";
message: string;
finishedAt: number;
}
| {
type: "spooled";
updateId: number;
queued: number;
};
export type TelegramIngressWorkerOptions = {
token: string;
accountId: string;
initialUpdateId: number | null;
spoolDir: string;
apiRoot?: string;
timeoutSeconds?: number;
network?: TelegramNetworkConfig;
proxy?: string;
};
export type TelegramIngressWorkerHandle = {
onMessage(listener: (message: TelegramIngressWorkerMessage) => void): () => void;
stop(): Promise<void>;
task(): Promise<void>;
};
export type TelegramIngressWorkerFactory = (
options: TelegramIngressWorkerOptions,
) => TelegramIngressWorkerHandle;
export const createTelegramIngressWorker: TelegramIngressWorkerFactory = (options) => {
const listeners = new Set<(message: TelegramIngressWorkerMessage) => void>();
const worker = new Worker(new URL("./telegram-ingress-worker.runtime.js", import.meta.url), {
workerData: options,
});
const taskPromise = new Promise<void>((resolve, reject) => {
worker.once("error", reject);
worker.once("exit", (code) => {
if (code === 0) {
resolve();
return;
}
reject(new Error(`Telegram ingress worker exited with code ${code}`));
});
});
worker.on("message", (message: TelegramIngressWorkerMessage) => {
for (const listener of listeners) {
listener(message);
}
});
return {
onMessage(listener) {
listeners.add(listener);
return () => {
listeners.delete(listener);
};
},
async stop() {
// oxlint-disable-next-line unicorn/require-post-message-target-origin -- Node worker_threads workers do not accept a targetOrigin argument.
worker.postMessage({ type: "stop" });
const timeout = setTimeout(() => {
void worker.terminate();
}, 15_000);
timeout.unref?.();
try {
await taskPromise.catch(() => undefined);
} finally {
clearTimeout(timeout);
}
},
task() {
return taskPromise;
},
};
};

View File

@@ -0,0 +1 @@
import "./src/telegram-ingress-worker.runtime.js";

View File

@@ -309,6 +309,26 @@ describe("stuck session recovery", () => {
]);
});
it("clears stale queued processing state even when the lane has no active work", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined);
mocks.isEmbeddedPiRunActive.mockReturnValue(false);
mocks.resetCommandLane.mockReturnValue(0);
await recoverStuckDiagnosticSession({
sessionId: "stale-session",
sessionKey: "agent:main:main",
ageMs: 180_000,
queueDepth: 2,
});
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
expect(warnLogMessages()).toEqual([
"stuck session recovery: sessionId=stale-session sessionKey=agent:main:main age=180s action=release_lane aborted=false drained=true released=0",
"stuck session recovery outcome: status=released action=release_lane sessionId=stale-session sessionKey=agent:main:main lane=session:agent:main:main released=0",
]);
});
it("releases a stale session-id lane when no session key is available", async () => {
mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false);
mocks.resetCommandLane.mockReturnValue(1);

View File

@@ -165,7 +165,9 @@ export async function recoverStuckDiagnosticSession(
const released =
sessionLane && (!activeSessionId || !aborted || !drained) ? resetCommandLane(sessionLane) : 0;
if (aborted || released > 0) {
const clearStaleQueuedSession = !aborted && released === 0 && (params.queueDepth ?? 0) > 0;
if (aborted || released > 0 || clearStaleQueuedSession) {
const action = aborted ? "abort_embedded_run" : "release_lane";
const stoppedFields = formatStoppedCronSessionDiagnosticFields(
resolveCronSessionDiagnosticContext({ sessionKey: params.sessionKey, activeSessionId }),