fix: surface stalled telegram ingress backlog

This commit is contained in:
Peter Steinberger
2026-05-15 20:35:34 +01:00
parent a6dd9fdf08
commit 25a8f5f3f8
6 changed files with 450 additions and 20 deletions

View File

@@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai
- Control UI/WebChat: keep optimistic image messages from embedding large inline `data:` previews and preserve image-only user turns in chat history, avoiding browser stack overflows when sending image attachments. Fixes #82182. Thanks @ExploreSheep.
- Agents/media: preserve message-tool-only delivery for generated music and video completion handoffs, so group/channel completions do not finish without posting the generated attachment.
- Telegram: drain queued outbound deliveries after polling reconnect confirms fresh `getUpdates` activity, so stale-socket and network recovery do not leave failed replies stranded. Fixes #50040. Refs #82175. Thanks @dmitriiforpost-commits and @shellyrocklobster.
- Telegram: mark isolated polling ingress unhealthy when a spooled inbound backlog stalls while Bot API polling still succeeds, so gateway/channel health no longer stays green after Telegram DM processing wedges. Fixes #82175. Thanks @shellyrocklobster.
- Agents: strip Gemini/Gemma `<final>` tags with attributes or self-closing syntax from delivered replies, including strict final-tag streaming enforcement. Fixes #65867.
- macOS/update: disarm legacy `ai.openclaw.update.*` LaunchAgents when `openclaw update` starts from one, preventing KeepAlive relaunch loops that repeatedly restart the Gateway and replay update continuations. Fixes #82167.
- Agents/replay: strip internal runtime-context metadata and `NO_REPLY` sentinels from provider replay and pending final-delivery recovery so restart and heartbeat resumes do not feed control text back to the model. Fixes #76629. Thanks @fuyizheng3120, @bryan-chx, and @cael-dandelion-cult.

View File

@@ -71,6 +71,12 @@ type DrainPendingDeliveriesCall = {
now: number,
) => { match: boolean; bypassBackoff: boolean };
};
type WorkerPollSuccessListener = (message: {
type: "poll-success";
offset: null;
count: number;
finishedAt: number;
}) => void;
type AsyncVoidFn = () => Promise<void>;
type MockCallSource = { mock: { calls: Array<Array<unknown>> } };
@@ -882,6 +888,309 @@ describe("TelegramPollingSession", () => {
}
});
it("keeps active spooled lanes blocked across isolated ingress restarts", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const abort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
let releaseRegularTurn: (() => void) | undefined;
const regularTurnDone = new Promise<void>((resolve) => {
releaseRegularTurn = resolve;
});
const handleUpdate = vi.fn(async () => {
await regularTurnDone;
});
createTelegramBotMock.mockImplementation(() => ({
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
init: vi.fn(async () => undefined),
handleUpdate,
stop: vi.fn(async () => undefined),
}));
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: {
update_id: 42,
message: { text: "summarize this", chat: { id: -100, type: "supergroup" } },
},
});
let workerTaskCalls = 0;
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
const createWorker = vi.fn(() => ({
onMessage: vi.fn(() => () => undefined),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
workerTaskCalls += 1;
if (workerTaskCalls === 1) {
return;
}
await workerDone;
}),
}));
try {
const session = createPollingSession({
abortSignal: abort.signal,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: 100,
},
});
const runPromise = session.runUntilAbort();
await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
await vi.advanceTimersByTimeAsync(16_000);
await vi.waitFor(() => expect(createWorker).toHaveBeenCalledTimes(2));
expect(handleUpdate).toHaveBeenCalledTimes(1);
releaseRegularTurn?.();
await vi.advanceTimersByTimeAsync(1_000);
await vi.waitFor(async () =>
expect(
(await listTelegramSpooledUpdates({ spoolDir: tempDir })).map(
(update) => update.updateId,
),
).toEqual([]),
);
abort.abort();
await vi.advanceTimersByTimeAsync(20_000);
await runPromise;
} finally {
releaseRegularTurn?.();
vi.useRealTimers();
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("keeps active spooled lanes blocked across account restarts", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const firstAbort = new AbortController();
const secondAbort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
let releaseRegularTurn: (() => void) | undefined;
const regularTurnDone = new Promise<void>((resolve) => {
releaseRegularTurn = resolve;
});
const handleUpdate = vi.fn(async () => {
await regularTurnDone;
});
createTelegramBotMock.mockImplementation(() => ({
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
init: vi.fn(async () => undefined),
handleUpdate,
stop: vi.fn(async () => undefined),
}));
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: {
update_id: 42,
message: { text: "summarize this", chat: { id: -100, type: "supergroup" } },
},
});
const createWorker = vi.fn(() => {
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
return {
onMessage: vi.fn(() => () => undefined),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
await workerDone;
}),
};
});
try {
const firstSession = createPollingSession({
abortSignal: firstAbort.signal,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: 100,
},
});
const firstRunPromise = firstSession.runUntilAbort();
await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
firstAbort.abort();
await vi.advanceTimersByTimeAsync(16_000);
await firstRunPromise;
const secondSession = createPollingSession({
abortSignal: secondAbort.signal,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: 100,
},
});
const secondRunPromise = secondSession.runUntilAbort();
await vi.waitFor(() => expect(createWorker).toHaveBeenCalledTimes(2));
await vi.advanceTimersByTimeAsync(1_000);
expect(handleUpdate).toHaveBeenCalledTimes(1);
releaseRegularTurn?.();
await vi.advanceTimersByTimeAsync(1_000);
await vi.waitFor(async () =>
expect(
(await listTelegramSpooledUpdates({ spoolDir: tempDir })).map(
(update) => update.updateId,
),
).toEqual([]),
);
secondAbort.abort();
await vi.advanceTimersByTimeAsync(20_000);
await secondRunPromise;
} finally {
releaseRegularTurn?.();
vi.useRealTimers();
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("marks isolated ingress unhealthy when a spooled backlog wedges while polling stays live", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const abort = new AbortController();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
const log = vi.fn();
const setStatus = vi.fn();
let releaseRegularTurn: (() => void) | undefined;
const regularTurnDone = new Promise<void>((resolve) => {
releaseRegularTurn = resolve;
});
const handleUpdate = vi.fn(async () => {
await regularTurnDone;
});
createTelegramBotMock.mockImplementation(() => ({
api: {
deleteWebhook: vi.fn(async () => true),
config: { use: vi.fn() },
},
init: vi.fn(async () => undefined),
handleUpdate,
stop: vi.fn(async () => undefined),
}));
for (const updateId of [42, 43]) {
await writeTelegramSpooledUpdate({
spoolDir: tempDir,
update: {
update_id: updateId,
message: { text: `dm ${updateId}`, chat: { id: 123, type: "private" } },
},
});
}
const workerListeners: WorkerPollSuccessListener[] = [];
const createWorker = vi.fn(() => {
let stopWorker: (() => void) | undefined;
const workerDone = new Promise<void>((resolve) => {
stopWorker = resolve;
});
return {
onMessage: vi.fn((listener: WorkerPollSuccessListener) => {
workerListeners.push(listener);
return () => undefined;
}),
stop: vi.fn(async () => {
stopWorker?.();
}),
task: vi.fn(async () => {
await workerDone;
}),
};
});
try {
const session = createPollingSession({
abortSignal: abort.signal,
log,
setStatus,
isolatedIngress: {
enabled: true,
spoolDir: tempDir,
createWorker,
drainIntervalMs: 100,
},
});
const runPromise = session.runUntilAbort();
await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
workerListeners[0]?.({
type: "poll-success",
offset: null,
count: 0,
finishedAt: Date.now(),
});
expect(statusPatches(setStatus).some((patch) => patch.connected === true)).toBe(true);
await vi.advanceTimersByTimeAsync(25 * 60_000 + 100);
await vi.waitFor(() =>
expect(log).toHaveBeenCalledWith(
expect.stringContaining("isolated polling spool backlog stalled"),
),
);
expect(
statusPatches(setStatus).some(
(patch) =>
patch.connected === false &&
String(patch.lastError).includes("isolated polling spool backlog stalled"),
),
).toBe(true);
workerListeners[0]?.({
type: "poll-success",
offset: null,
count: 0,
finishedAt: Date.now(),
});
expect(statusPatches(setStatus).at(-1)?.connected).toBe(false);
releaseRegularTurn?.();
await vi.advanceTimersByTimeAsync(1_000);
await vi.waitFor(async () =>
expect(
(await listTelegramSpooledUpdates({ spoolDir: tempDir })).map(
(update) => update.updateId,
),
).toEqual([]),
);
workerListeners[0]?.({
type: "poll-success",
offset: null,
count: 0,
finishedAt: Date.now(),
});
await vi.waitFor(() => expect(statusPatches(setStatus).at(-1)?.connected).toBe(true));
expect(createWorker).toHaveBeenCalledTimes(1);
abort.abort();
await vi.advanceTimersByTimeAsync(20_000);
await runPromise;
} finally {
releaseRegularTurn?.();
vi.useRealTimers();
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("forces a restart when polling stalls without getUpdates activity", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);

View File

@@ -41,6 +41,7 @@ const MIN_POLL_STALL_THRESHOLD_MS = 30_000;
const MAX_POLL_STALL_THRESHOLD_MS = 600_000;
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
const POLL_STOP_GRACE_MS = 15_000;
const ISOLATED_INGRESS_BACKLOG_STALL_MS = 25 * 60_000;
const TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS = Math.ceil(
TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS / 1000,
);
@@ -109,13 +110,34 @@ type TelegramPollingSessionOpts = {
};
};
type SpooledUpdateHandlerState = {
handlerKey: string;
laneKey: string;
task: Promise<boolean>;
updateId: number;
startedAt: number;
};
type SpooledUpdateDrainResult = {
blockedByLane: Set<string>;
started: number;
};
// Account health restarts create a new session in the same process while an old
// spooled handler may still be running after shutdown grace.
const activeSpooledUpdateHandlersByLane = new Map<string, SpooledUpdateHandlerState>();
function buildSpooledUpdateHandlerKey(params: { spoolDir: string; laneKey: string }): string {
return `${params.spoolDir}\0${params.laneKey}`;
}
export class TelegramPollingSession {
#restartAttempts = 0;
#webhookCleared = false;
#forceRestarted = false;
#activeRunner: ReturnType<typeof run> | undefined;
#activeFetchAbort: AbortController | undefined;
#spooledUpdateHandlersByLane = new Map<string, Promise<boolean>>();
#spooledUpdateHandlerKeys = new Set<string>();
#transportState: TelegramPollingTransportState;
#status: ReturnType<typeof createTelegramPollingStatusPublisher>;
#stallThresholdMs: number;
@@ -315,11 +337,19 @@ export class TelegramPollingSession {
}
async #waitForSpooledUpdateHandlers(): Promise<void> {
await Promise.allSettled(this.#spooledUpdateHandlersByLane.values());
await Promise.allSettled(
[...this.#spooledUpdateHandlerKeys]
.map((handlerKey) => activeSpooledUpdateHandlersByLane.get(handlerKey)?.task)
.filter((task): task is Promise<boolean> => Boolean(task)),
);
}
async #drainSpooledUpdates(params: { bot: TelegramBot; spoolDir: string }): Promise<number> {
async #drainSpooledUpdates(params: {
bot: TelegramBot;
spoolDir: string;
}): Promise<SpooledUpdateDrainResult> {
const updates = await listTelegramSpooledUpdates({ spoolDir: params.spoolDir, limit: 100 });
const blockedByLane = new Set<string>();
let started = 0;
for (const update of updates) {
const laneKey = getTelegramSequentialKey({
@@ -329,20 +359,54 @@ export class TelegramPollingSession {
if (this.opts.abortSignal?.aborted) {
break;
}
if (this.#spooledUpdateHandlersByLane.has(laneKey)) {
const handlerKey = buildSpooledUpdateHandlerKey({ spoolDir: params.spoolDir, laneKey });
if (activeSpooledUpdateHandlersByLane.has(handlerKey)) {
blockedByLane.add(handlerKey);
continue;
}
const handler = this.#handleSpooledUpdate({
bot: params.bot,
update,
});
this.#spooledUpdateHandlersByLane.set(laneKey, handler);
const state: SpooledUpdateHandlerState = {
handlerKey,
laneKey,
task: handler,
updateId: update.updateId,
startedAt: Date.now(),
};
activeSpooledUpdateHandlersByLane.set(handlerKey, state);
this.#spooledUpdateHandlerKeys.add(handlerKey);
void handler.finally(() => {
this.#spooledUpdateHandlersByLane.delete(laneKey);
if (activeSpooledUpdateHandlersByLane.get(handlerKey) === state) {
activeSpooledUpdateHandlersByLane.delete(handlerKey);
}
this.#spooledUpdateHandlerKeys.delete(handlerKey);
});
started += 1;
}
return started;
return { blockedByLane, started };
}
#detectStaleSpooledHandler(
blockedHandlerKeys: Set<string>,
): (SpooledUpdateHandlerState & { ageMs: number }) | null {
const now = Date.now();
let stale: (SpooledUpdateHandlerState & { ageMs: number }) | null = null;
for (const handlerKey of blockedHandlerKeys) {
const handler = activeSpooledUpdateHandlersByLane.get(handlerKey);
if (!handler) {
continue;
}
const ageMs = now - handler.startedAt;
if (ageMs <= ISOLATED_INGRESS_BACKLOG_STALL_MS) {
continue;
}
if (!stale || ageMs > stale.ageMs) {
stale = { ...handler, ageMs };
}
}
return stale;
}
async #runIsolatedIngressCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
@@ -384,6 +448,7 @@ export class TelegramPollingSession {
outcome: "not-started",
};
let consecutiveDrainFailures = 0;
const stalledBacklogKeys = new Set<string>();
const unsubscribe = worker.onMessage((message) => {
if (message.type === "poll-start") {
pollState.startedAt = message.startedAt;
@@ -393,7 +458,9 @@ export class TelegramPollingSession {
return;
}
if (message.type === "poll-success") {
this.#status.notePollSuccess(message.finishedAt);
if (stalledBacklogKeys.size === 0) {
this.#status.notePollSuccess(message.finishedAt);
}
this.#drainPendingDeliveriesAfterReconnect();
pollState.outcome = `ok:${message.count}`;
return;
@@ -409,14 +476,38 @@ export class TelegramPollingSession {
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500));
let drainActive = false;
const stopBot = () => {
return Promise.resolve(bot.stop())
.then(() => undefined)
.catch(() => {
// Bot may already be stopped by shutdown paths.
});
};
const drainOnce = async () => {
if (drainActive || this.opts.abortSignal?.aborted) {
return;
}
drainActive = true;
try {
await this.#drainSpooledUpdates({ bot, spoolDir });
const drain = await this.#drainSpooledUpdates({ bot, spoolDir });
consecutiveDrainFailures = 0;
for (const handlerKey of [...stalledBacklogKeys]) {
if (
!activeSpooledUpdateHandlersByLane.has(handlerKey) ||
!drain.blockedByLane.has(handlerKey)
) {
stalledBacklogKeys.delete(handlerKey);
}
}
const staleHandler = this.#detectStaleSpooledHandler(drain.blockedByLane);
if (staleHandler) {
if (!stalledBacklogKeys.has(staleHandler.handlerKey)) {
stalledBacklogKeys.add(staleHandler.handlerKey);
const message = `Telegram isolated polling spool backlog stalled behind update ${staleHandler.updateId} on lane ${staleHandler.laneKey} for ${formatDurationPrecise(staleHandler.ageMs)}; marking polling unhealthy until the backlog drains.`;
this.opts.log(`[telegram] ${message}`);
this.#status.notePollingError(message);
}
}
} catch (err) {
consecutiveDrainFailures += 1;
this.opts.log(
@@ -431,13 +522,6 @@ export class TelegramPollingSession {
void drainOnce();
}, drainIntervalMs);
drainTimer.unref?.();
const stopBot = () => {
return Promise.resolve(bot.stop())
.then(() => undefined)
.catch(() => {
// Bot may already be stopped by shutdown paths.
});
};
try {
await worker.task();
if (this.opts.abortSignal?.aborted) {

View File

@@ -27,6 +27,13 @@ export function createTelegramPollingStatusPublisher(setStatus?: TelegramPolling
lastError: null,
});
},
notePollingError(error: string) {
setStatus?.({
mode: "polling",
connected: false,
lastError: error,
});
},
notePollingStop() {
setStatus?.({
mode: "polling",

View File

@@ -69,6 +69,10 @@ function appendTelegramRuntimeError(message: string, lastError: unknown): string
return error ? `${message}: ${error}` : message;
}
function isTelegramPollingBacklogStallError(lastError: unknown): boolean {
return Boolean(asString(lastError)?.includes("isolated polling spool backlog stalled"));
}
function collectTelegramPollingRuntimeIssues(params: {
account: TelegramAccountStatus;
accountId: string;
@@ -88,14 +92,14 @@ function collectTelegramPollingRuntimeIssues(params: {
const withinStartupGrace =
lastStartAt != null && now - lastStartAt < TELEGRAM_POLLING_CONNECT_GRACE_MS;
if (!withinStartupGrace) {
const message = isTelegramPollingBacklogStallError(account.lastError)
? "Telegram isolated polling spool backlog is stalled while Bot API polling is still succeeding"
: "Telegram polling is running but has not completed a successful getUpdates call since startup";
issues.push({
channel: "telegram",
accountId,
kind: "runtime",
message: appendTelegramRuntimeError(
"Telegram polling is running but has not completed a successful getUpdates call since startup",
account.lastError,
),
message: appendTelegramRuntimeError(message, account.lastError),
fix,
});
}

View File

@@ -119,6 +119,31 @@ describe("collectTelegramStatusIssues", () => {
expect(issues[0]?.fix).toContain("channels status --probe");
});
it("reports isolated polling spool backlog stalls distinctly from startup failures", () => {
const issues = collectTelegramStatusIssues([
{
accountId: "main",
enabled: true,
configured: true,
running: true,
mode: "polling",
connected: false,
lastStartAt: Date.now() - 121_000,
lastError:
"Telegram isolated polling spool backlog stalled behind update 42 on lane telegram:123 for 1500100ms; marking polling unhealthy until the backlog drains.",
} as ChannelAccountSnapshot,
]);
expect(issues).toHaveLength(1);
expectIssueFields(issues[0], {
channel: "telegram",
accountId: "main",
kind: "runtime",
});
expect(issues[0]?.message).toContain("spool backlog is stalled");
expect(issues[0]?.message).not.toContain("has not completed a successful getUpdates call");
});
it("does not report polling startup before the connect grace expires", () => {
const issues = collectTelegramStatusIssues([
{