Fix Telegram polling lease cleanup on restart (#81890)

* fix(telegram): release stopped polling leases

* docs: add Telegram polling lease changelog
This commit is contained in:
Josh Avant
2026-05-14 14:04:34 -05:00
committed by GitHub
parent f64feab47a
commit 130c2d5044
5 changed files with 205 additions and 0 deletions

View File

@@ -139,6 +139,7 @@ Docs: https://docs.openclaw.ai
- Codex auth: accept OAuth profiles backed by `oauthRef` during runtime auth selection, so official Codex OAuth logins are used by app-server agent runs. (#81633) Thanks @obviyus.
- Sessions/status: classify ACP spawn-child sessions as `kind: "spawn-child"` instead of `"direct"` in `openclaw sessions` and status output; extract the duplicated session-kind classifier into a shared helper (`src/sessions/classify-session-kind.ts`) so both surfaces stay in sync. Fixes catalog #19. (#79544)
- Sessions/Gateway: report `agentRuntime.id: "acpx"` (or stored backend id) with `source: "session-key"` for ACP control-plane session rows in `openclaw sessions --json`, `openclaw status`, and Gateway session RPC responses instead of the incorrect `"auto"` / `"pi"` implicit fallback. Fixes catalog #18. (#79550)
- Telegram: release stopped polling leases after the gateway stop grace so in-process restarts can reuse the same bot token without weakening active duplicate-poller protection. Fixes #81507. (#81890) Thanks @joshavant.
- Telegram: delete tool-progress-only draft bubbles before rotating to the real answer, preventing orphaned progress messages in streamed replies.
- Codex app-server: keep per-agent `CODEX_HOME` isolation without rewriting `HOME` by default, so Codex-run subprocesses can still find normal user-home config, tokens, and CLI state unless the launch explicitly overrides `HOME`. Thanks @pashpashpash.
- ACP: preserve redacted numeric JSON-RPC `RequestError` details in runtime failure text, so backend diagnostics are visible instead of only `Internal error`. Fixes #81126. (#81188) Thanks @vyctorbrzezowski.

View File

@@ -6,6 +6,10 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { afterEach, describe, expect, it, vi } from "vitest";
import { telegramPlugin } from "./channel.js";
import type { TelegramMonitorFn } from "./monitor.types.js";
import {
acquireTelegramPollingLease,
resetTelegramPollingLeasesForTests,
} from "./polling-lease.js";
import { clearTelegramRuntime, setTelegramRuntime } from "./runtime.js";
import type { TelegramProbeFn } from "./runtime.types.js";
import type { TelegramRuntime } from "./runtime.types.js";
@@ -115,6 +119,7 @@ async function waitForCondition(check: () => boolean, message: string, attempts
afterEach(() => {
clearTelegramRuntime();
resetTelegramPollingLeasesForTests();
resetTelegramStartupProbeLimiterForTests();
probeTelegram.mockReset();
monitorTelegramProvider.mockReset();
@@ -333,6 +338,44 @@ describe("telegramPlugin gateway startup", () => {
expect(probeTelegram).toHaveBeenCalledTimes(2);
expect(monitorTelegramProvider).toHaveBeenCalledTimes(2);
});
it("releases a stopped stale polling lease for the account token", async () => {
vi.useFakeTimers();
try {
const cfg = createTelegramConfig();
const account = telegramPlugin.config.resolveAccount(cfg, "default");
const stopAccount = telegramPlugin.gateway?.stopAccount;
if (!stopAccount) {
throw new Error("expected Telegram stopAccount gateway handler");
}
const abort = new AbortController();
await acquireTelegramPollingLease({
token: "123456:bad-token",
accountId: "default",
abortSignal: abort.signal,
});
abort.abort();
const stop = stopAccount(
createStartAccountContext({
account,
abortSignal: abort.signal,
cfg,
}),
);
await vi.advanceTimersByTimeAsync(5_000);
await stop;
const next = await acquireTelegramPollingLease({
token: "123456:bad-token",
accountId: "default",
});
next.release();
} finally {
vi.useRealTimers();
}
});
});
describe("telegramPlugin outbound attachments", () => {

View File

@@ -58,6 +58,7 @@ import * as monitorModule from "./monitor.js";
import { looksLikeTelegramTargetId, normalizeTelegramMessagingTarget } from "./normalize.js";
import { createTelegramOutboundAdapter } from "./outbound-adapter.js";
import { parseTelegramThreadId } from "./outbound-params.js";
import { releaseStoppedTelegramPollingLease } from "./polling-lease.js";
import type { TelegramProbe } from "./probe.js";
import * as probeModule from "./probe.js";
import { resolveTelegramReactionLevel } from "./reaction-level.js";
@@ -953,6 +954,19 @@ export const telegramPlugin = createChatChannelPlugin({
setStatus,
});
},
stopAccount: async ({ account, accountId, log }) => {
const token = (account.token ?? "").trim();
if (!token) {
return;
}
const released = await releaseStoppedTelegramPollingLease({
token,
accountId,
});
if (released) {
log?.info?.(`[${accountId}] released stopped Telegram polling lease`);
}
},
logoutAccount: async ({ accountId, cfg }) => {
const envToken = process.env.TELEGRAM_BOT_TOKEN?.trim() ?? "";
const nextCfg = { ...cfg } as OpenClawConfig;

View File

@@ -1,6 +1,7 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
acquireTelegramPollingLease,
releaseStoppedTelegramPollingLease,
resetTelegramPollingLeasesForTests,
} from "./polling-lease.js";
@@ -99,4 +100,114 @@ describe("Telegram polling lease", () => {
vi.useRealTimers();
}
});
it("does not release a no-signal active lease", async () => {
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
});
await expect(
acquireTelegramPollingLease({
token: "123:abc",
accountId: "ops",
}),
).rejects.toThrow('refusing duplicate poller for account "ops"');
await expect(
releaseStoppedTelegramPollingLease({
token: "123:abc",
accountId: "default",
}),
).resolves.toBe(false);
await expect(
acquireTelegramPollingLease({
token: "123:abc",
accountId: "ops",
}),
).rejects.toThrow('account "default"');
first.release();
});
it("does not release a non-aborted active lease", async () => {
const abort = new AbortController();
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
abortSignal: abort.signal,
});
await expect(
releaseStoppedTelegramPollingLease({
token: "123:abc",
accountId: "default",
}),
).resolves.toBe(false);
await expect(
acquireTelegramPollingLease({
token: "123:abc",
accountId: "ops",
}),
).rejects.toThrow('account "default"');
first.release();
});
it("releases an aborted same-account lease after the stop wait elapses", async () => {
vi.useFakeTimers();
try {
const abort = new AbortController();
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
abortSignal: abort.signal,
});
abort.abort();
const release = releaseStoppedTelegramPollingLease({
token: "123:abc",
accountId: "default",
waitMs: 10,
});
await vi.advanceTimersByTimeAsync(10);
await expect(release).resolves.toBe(true);
const next = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
});
next.release();
first.release();
} finally {
vi.useRealTimers();
}
});
it("releases an aborted same-account lease immediately with no stop wait", async () => {
const abort = new AbortController();
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
abortSignal: abort.signal,
});
abort.abort();
await expect(
releaseStoppedTelegramPollingLease({
token: "123:abc",
accountId: "default",
waitMs: 0,
}),
).resolves.toBe(true);
const next = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
});
next.release();
first.release();
});
});

View File

@@ -28,6 +28,12 @@ type AcquireTelegramPollingLeaseOpts = {
waitMs?: number;
};
type ReleaseStoppedTelegramPollingLeaseOpts = {
token: string;
accountId: string;
waitMs?: number;
};
type WaitForPreviousResult = "released" | "timeout" | "aborted";
function pollingLeaseRegistry(): TelegramPollingLeaseRegistry {
@@ -58,6 +64,9 @@ async function waitForPreviousRelease(params: {
if (params.signal?.aborted) {
return "aborted";
}
if (params.waitMs <= 0) {
return "timeout";
}
let timer: ReturnType<typeof setTimeout> | undefined;
let abortListener: (() => void) | undefined;
@@ -184,6 +193,33 @@ export async function acquireTelegramPollingLease(
}
}
export async function releaseStoppedTelegramPollingLease(
opts: ReleaseStoppedTelegramPollingLeaseOpts,
): Promise<boolean> {
const registry = pollingLeaseRegistry();
const fingerprint = fingerprintTelegramBotToken(opts.token);
const existing = registry.get(fingerprint);
if (!existing || existing.accountId !== opts.accountId) {
return false;
}
if (!existing.abortSignal?.aborted) {
return false;
}
const waitResult = await waitForPreviousRelease({
done: existing.done,
waitMs: opts.waitMs ?? DEFAULT_TELEGRAM_POLLING_LEASE_WAIT_MS,
});
if (waitResult === "released" || registry.get(fingerprint) !== existing) {
return false;
}
registry.delete(fingerprint);
existing.resolveDone();
return true;
}
export function resetTelegramPollingLeasesForTests(): void {
pollingLeaseRegistry().clear();
}