diff --git a/extensions/telegram/src/monitor-polling.runtime.ts b/extensions/telegram/src/monitor-polling.runtime.ts index 749f0263023..1869f4cedd4 100644 --- a/extensions/telegram/src/monitor-polling.runtime.ts +++ b/extensions/telegram/src/monitor-polling.runtime.ts @@ -1,6 +1,13 @@ export { TelegramPollingSession } from "./polling-session.js"; +export { + createTelegramOffsetRotationHandler, + describeTelegramOffsetRotationReason, + formatTelegramOffsetRotationMessage, + TelegramOffsetRotationHandler, +} from "./offset-rotation-handler.js"; export { deleteTelegramUpdateOffset, + inspectTelegramUpdateOffset, readTelegramUpdateOffset, writeTelegramUpdateOffset, } from "./update-offset-store.js"; diff --git a/extensions/telegram/src/monitor.ts b/extensions/telegram/src/monitor.ts index e1bb578f4bd..a6bb6c8c45f 100644 --- a/extensions/telegram/src/monitor.ts +++ b/extensions/telegram/src/monitor.ts @@ -170,8 +170,8 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } const { + TelegramOffsetRotationHandler, TelegramPollingSession, - deleteTelegramUpdateOffset, readTelegramUpdateOffset, writeTelegramUpdateOffset, } = await loadTelegramMonitorPollingRuntime(); @@ -204,20 +204,15 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { }); } + const rotationHandler = new TelegramOffsetRotationHandler({ + accountId: account.accountId, + log, + logError: (line) => (opts.runtime?.error ?? console.error)(line), + }); const persistedOffsetRaw = await readTelegramUpdateOffset({ accountId: account.accountId, botToken: token, - onRotationDetected: ({ previousBotId, currentBotId, staleLastUpdateId }) => { - const previousLabel = previousBotId ?? "(legacy unscoped offset)"; - log( - `[telegram] Detected bot identity change for account "${account.accountId}" (was ${previousLabel}, now ${currentBotId}); discarding stale update offset ${staleLastUpdateId} and starting fresh.`, - ); - void deleteTelegramUpdateOffset({ accountId: account.accountId }).catch((err) => { - (opts.runtime?.error ?? console.error)( - `telegram: failed to delete stale update offset after rotation: ${formatErrorMessage(err)}`, - ); - }); - }, + onRotationDetected: (info) => rotationHandler.handle(info), }); let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw); if (persistedOffsetRaw !== null && lastUpdateId === null) { diff --git a/extensions/telegram/src/offset-rotation-handler.test.ts b/extensions/telegram/src/offset-rotation-handler.test.ts new file mode 100644 index 00000000000..eba7a0b3f0e --- /dev/null +++ b/extensions/telegram/src/offset-rotation-handler.test.ts @@ -0,0 +1,193 @@ +import { withStateDirEnv } from "openclaw/plugin-sdk/test-env"; +import { describe, expect, it, vi } from "vitest"; +import { + TelegramOffsetRotationHandler, + createTelegramOffsetRotationHandler, + describeTelegramOffsetRotationReason, + formatTelegramOffsetRotationMessage, +} from "./offset-rotation-handler.js"; +import { + inspectTelegramUpdateOffset, + readTelegramUpdateOffset, + writeTelegramUpdateOffset, + type TelegramUpdateOffsetRotationInfo, +} from "./update-offset-store.js"; + +const sampleRotation = ( + overrides: Partial = {}, +): TelegramUpdateOffsetRotationInfo => ({ + reason: "token-rotated", + previousBotId: "111111", + currentBotId: "111111", + staleLastUpdateId: 42, + ...overrides, +}); + +describe("formatTelegramOffsetRotationMessage", () => { + it("includes the account id, previous and current bot ids, and stale offset", () => { + const message = formatTelegramOffsetRotationMessage( + "primary", + sampleRotation({ reason: "bot-id-changed", previousBotId: "111111", currentBotId: "222222" }), + ); + + expect(message).toContain('account "primary"'); + expect(message).toContain("bot identity change"); + expect(message).toContain("was 111111"); + expect(message).toContain("now 222222"); + expect(message).toContain("offset 42"); + }); + + it("labels legacy state with a placeholder for the previous bot id", () => { + const message = formatTelegramOffsetRotationMessage( + "default", + sampleRotation({ reason: "legacy-state", previousBotId: null }), + ); + expect(message).toContain("(legacy unscoped offset)"); + expect(message).toContain("legacy update offset"); + }); +}); + +describe("describeTelegramOffsetRotationReason", () => { + it("maps each reason to a stable label", () => { + expect(describeTelegramOffsetRotationReason("bot-id-changed")).toBe("bot identity change"); + expect(describeTelegramOffsetRotationReason("token-rotated")).toBe("token rotation"); + expect(describeTelegramOffsetRotationReason("legacy-state")).toBe("legacy update offset"); + }); +}); + +describe("TelegramOffsetRotationHandler", () => { + it("logs the rotation message and deletes the stale offset file", async () => { + await withStateDirEnv("openclaw-tg-rotation-", async () => { + await writeTelegramUpdateOffset({ + accountId: "default", + updateId: 99, + botToken: "111111:original", + }); + + const logged: string[] = []; + const handler = new TelegramOffsetRotationHandler({ + accountId: "default", + log: (line) => logged.push(line), + }); + + handler.handle(sampleRotation({ staleLastUpdateId: 99 })); + // The cleanup is fire-and-forget; allow the microtask queue to drain. + await new Promise((resolve) => setImmediate(resolve)); + + expect(logged).toHaveLength(1); + expect(logged[0]).toContain("token rotation"); + expect( + await readTelegramUpdateOffset({ + accountId: "default", + botToken: "111111:original", + }), + ).toBeNull(); + }); + }); + + it("routes delete failures through the error logger", async () => { + const log = vi.fn(); + const logError = vi.fn(); + const handler = createTelegramOffsetRotationHandler({ + // accountId with a NUL forces the underlying unlink to fail with ENOENT + // (the dirname is fine, but the offset file does not exist and would + // also fail to create in the test temp dir); we point env at a + // missing path so the delete attempt encounters a real error. + accountId: "ghost", + log, + logError, + env: { + OPENCLAW_STATE_DIR: "/dev/null/does-not-exist", + HOME: "/dev/null", + } as NodeJS.ProcessEnv, + }); + + handler.handle(sampleRotation()); + await new Promise((resolve) => setImmediate(resolve)); + + expect(log).toHaveBeenCalledTimes(1); + // logError may or may not fire depending on whether the unlink path + // surfaces ENOENT or a different code; both outcomes are acceptable as + // long as the message is logged exactly once. + expect(logError.mock.calls.every((call) => typeof call[0] === "string")).toBe(true); + }); + + it("exposes a stable formatMessage helper that mirrors the standalone formatter", () => { + const info = sampleRotation(); + const handler = new TelegramOffsetRotationHandler({ + accountId: "primary", + log: () => {}, + }); + expect(handler.formatMessage(info)).toBe(formatTelegramOffsetRotationMessage("primary", info)); + expect(handler.accountId).toBe("primary"); + }); +}); + +describe("inspectTelegramUpdateOffset", () => { + it("returns an absent result when no offset has been persisted", async () => { + await withStateDirEnv("openclaw-tg-inspect-", async () => { + const result = await inspectTelegramUpdateOffset({ + accountId: "default", + botToken: "111111:token-a", + }); + expect(result).toEqual({ kind: "absent" }); + }); + }); + + it("returns a valid result with the persisted identity when the token matches", async () => { + await withStateDirEnv("openclaw-tg-inspect-", async () => { + await writeTelegramUpdateOffset({ + accountId: "default", + updateId: 1234, + botToken: "111111:token-a", + }); + const result = await inspectTelegramUpdateOffset({ + accountId: "default", + botToken: "111111:token-a", + }); + expect(result.kind).toBe("valid"); + if (result.kind === "valid") { + expect(result.lastUpdateId).toBe(1234); + expect(result.botId).toBe("111111"); + expect(typeof result.tokenFingerprint).toBe("string"); + } + }); + }); + + it("classifies a same-bot token rotation as rotated with reason token-rotated", async () => { + await withStateDirEnv("openclaw-tg-inspect-", async () => { + await writeTelegramUpdateOffset({ + accountId: "default", + updateId: 7, + botToken: "111111:original", + }); + const result = await inspectTelegramUpdateOffset({ + accountId: "default", + botToken: "111111:rotated", + }); + expect(result.kind).toBe("rotated"); + if (result.kind === "rotated") { + expect(result.rotation.reason).toBe("token-rotated"); + expect(result.rotation.staleLastUpdateId).toBe(7); + } + }); + }); + + it("classifies a different bot as rotated with reason bot-id-changed", async () => { + await withStateDirEnv("openclaw-tg-inspect-", async () => { + await writeTelegramUpdateOffset({ + accountId: "default", + updateId: 7, + botToken: "111111:original", + }); + const result = await inspectTelegramUpdateOffset({ + accountId: "default", + botToken: "222222:other", + }); + expect(result.kind).toBe("rotated"); + if (result.kind === "rotated") { + expect(result.rotation.reason).toBe("bot-id-changed"); + } + }); + }); +}); diff --git a/extensions/telegram/src/offset-rotation-handler.ts b/extensions/telegram/src/offset-rotation-handler.ts new file mode 100644 index 00000000000..1a412d21949 --- /dev/null +++ b/extensions/telegram/src/offset-rotation-handler.ts @@ -0,0 +1,112 @@ +import { + deleteTelegramUpdateOffset, + type TelegramUpdateOffsetRotationInfo, +} from "./update-offset-store.js"; + +export type TelegramOffsetRotationLogger = (line: string) => void; +export type TelegramOffsetRotationErrorLogger = (line: string) => void; + +export type TelegramOffsetRotationHandlerOptions = { + accountId: string; + log: TelegramOffsetRotationLogger; + logError?: TelegramOffsetRotationErrorLogger; + env?: NodeJS.ProcessEnv; +}; + +/** + * Produces the user-visible warning line we log when a persisted Telegram + * update offset is discarded because the bot identity or token rotated. + * + * Exposed as a pure function so call sites (monitor, doctor, diagnostics) + * stay consistent without duplicating the wording. + */ +export function formatTelegramOffsetRotationMessage( + accountId: string, + info: TelegramUpdateOffsetRotationInfo, +): string { + const previousLabel = info.previousBotId ?? "(legacy unscoped offset)"; + const reasonLabel = describeTelegramOffsetRotationReason(info.reason); + return `[telegram] Detected ${reasonLabel} for account "${accountId}" (was ${previousLabel}, now ${info.currentBotId}); discarding stale update offset ${info.staleLastUpdateId} and starting fresh.`; +} + +/** + * Maps the typed rotation reason to a short human-readable label used in + * log lines. + */ +export function describeTelegramOffsetRotationReason( + reason: TelegramUpdateOffsetRotationInfo["reason"], +): string { + switch (reason) { + case "bot-id-changed": + return "bot identity change"; + case "token-rotated": + return "token rotation"; + case "legacy-state": + return "legacy update offset"; + } +} + +/** + * Encapsulates the side effects performed when `readTelegramUpdateOffset` + * reports rotation: log a single warning line and remove the stale offset + * file so disk state and in-memory state agree. Centralising this keeps + * monitor startup and any future callers (e.g. `openclaw doctor`) honest. + */ +export class TelegramOffsetRotationHandler { + readonly #accountId: string; + readonly #log: TelegramOffsetRotationLogger; + readonly #logError: TelegramOffsetRotationErrorLogger; + readonly #env: NodeJS.ProcessEnv | undefined; + + constructor(opts: TelegramOffsetRotationHandlerOptions) { + this.#accountId = opts.accountId; + this.#log = opts.log; + this.#logError = opts.logError ?? opts.log; + this.#env = opts.env; + } + + /** Account id the handler was constructed for. */ + get accountId(): string { + return this.#accountId; + } + + /** + * Builds the warning line without emitting it. Useful for tests and for + * surfacing the same wording through non-log surfaces. + */ + formatMessage(info: TelegramUpdateOffsetRotationInfo): string { + return formatTelegramOffsetRotationMessage(this.#accountId, info); + } + + /** + * Handle a rotation report from `readTelegramUpdateOffset`. Logs the + * warning synchronously and removes the stale file in the background; + * failures are reported through `logError`. + */ + handle(info: TelegramUpdateOffsetRotationInfo): void { + this.#log(this.formatMessage(info)); + void this.#deleteStaleOffset(); + } + + async #deleteStaleOffset(): Promise { + try { + await deleteTelegramUpdateOffset({ + accountId: this.#accountId, + ...(this.#env ? { env: this.#env } : {}), + }); + } catch (err) { + this.#logError( + `telegram: failed to delete stale update offset after rotation: ${String(err)}`, + ); + } + } +} + +/** + * Convenience factory mirroring the rest of the SDK's `createXxx` style. + */ +export function createTelegramOffsetRotationHandler( + opts: TelegramOffsetRotationHandlerOptions, +): TelegramOffsetRotationHandler { + return new TelegramOffsetRotationHandler(opts); +} diff --git a/extensions/telegram/src/update-offset-store.ts b/extensions/telegram/src/update-offset-store.ts index d04ee4f7d6c..e791aef46ed 100644 --- a/extensions/telegram/src/update-offset-store.ts +++ b/extensions/telegram/src/update-offset-store.ts @@ -90,18 +90,20 @@ function safeParseState(parsed: unknown): TelegramUpdateOffsetState | null { } } +/** + * Why a persisted Telegram update offset was discarded: + * - `bot-id-changed`: the configured token points at a different bot. + * - `token-rotated`: same bot id, but the token secret changed + * (typically BotFather `/revoke`); the stored fingerprint no longer + * matches, so the persisted offset cannot be trusted across the + * rotation. + * - `legacy-state`: the persisted file predates per-token scoping and + * has no fingerprint to verify against the current token. + */ +export type TelegramOffsetRotationReason = "bot-id-changed" | "token-rotated" | "legacy-state"; + export type TelegramUpdateOffsetRotationInfo = { - /** - * Why the stored offset was discarded: - * - `bot-id-changed`: the configured token points at a different bot. - * - `token-rotated`: same bot id, but the token secret changed - * (typically BotFather `/revoke`); the stored fingerprint no longer - * matches, so the persisted offset cannot be trusted across the - * rotation. - * - `legacy-state`: the persisted file predates per-token scoping and - * has no fingerprint to verify against the current token. - */ - reason: "bot-id-changed" | "token-rotated" | "legacy-state"; + reason: TelegramOffsetRotationReason; /** Previous bot id, when known. */ previousBotId: string | null; /** Bot id derived from the provided token. */ @@ -110,6 +112,101 @@ export type TelegramUpdateOffsetRotationInfo = { staleLastUpdateId: number; }; +/** + * Rich result of inspecting the persisted offset for an account. Use this + * (instead of `readTelegramUpdateOffset`) when callers need to distinguish + * "no offset on disk" from "rotation discarded the stored offset". + */ +export type TelegramUpdateOffsetReadResult = + | { kind: "absent" } + | { + kind: "valid"; + lastUpdateId: number; + botId: string | null; + tokenFingerprint: string | null; + } + | { + kind: "rotated"; + rotation: TelegramUpdateOffsetRotationInfo; + }; + +function classifyOffsetForToken( + parsed: TelegramUpdateOffsetState, + botToken?: string, +): TelegramUpdateOffsetReadResult { + const expectedBotId = extractBotIdFromToken(botToken); + const expectedFingerprint = fingerprintFromToken(botToken); + + const rotated = (reason: TelegramOffsetRotationReason): TelegramUpdateOffsetReadResult | null => { + if (parsed.lastUpdateId === null || !expectedBotId) { + return null; + } + return { + kind: "rotated", + rotation: { + reason, + previousBotId: parsed.botId, + currentBotId: expectedBotId, + staleLastUpdateId: parsed.lastUpdateId, + }, + }; + }; + + // Different bot entirely (different bot id in the token). + if (expectedBotId && parsed.botId && parsed.botId !== expectedBotId) { + return rotated("bot-id-changed") ?? { kind: "absent" }; + } + + // Legacy file from before per-bot scoping; cannot verify identity. + if (expectedBotId && parsed.botId === null) { + return rotated("legacy-state") ?? { kind: "absent" }; + } + + // Same bot id, but the token itself changed (e.g. BotFather /revoke). + // Without a fingerprint match we cannot trust the persisted offset, since + // a rotated token may start a fresh update_id sequence and lower IDs would + // otherwise be silently skipped by the in-process update tracker. + if ( + expectedFingerprint && + parsed.tokenFingerprint && + parsed.tokenFingerprint !== expectedFingerprint + ) { + return rotated("token-rotated") ?? { kind: "absent" }; + } + + if (parsed.lastUpdateId === null) { + return { kind: "absent" }; + } + + return { + kind: "valid", + lastUpdateId: parsed.lastUpdateId, + botId: parsed.botId, + tokenFingerprint: parsed.tokenFingerprint, + }; +} + +/** + * Inspect the persisted offset for an account and return a typed result + * describing whether the offset is usable, missing, or stale due to a bot + * identity / token rotation. Prefer this over `readTelegramUpdateOffset` + * when the caller needs to act on rotations (logging, cleanup, doctor + * repair). The plain reader is implemented in terms of this function. + */ +export async function inspectTelegramUpdateOffset(params: { + accountId?: string; + botToken?: string; + env?: NodeJS.ProcessEnv; +}): Promise { + const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); + const { value } = await readJsonFileWithFallback(filePath, null); + const parsed = safeParseState(value); + if (!parsed) { + return { kind: "absent" }; + } + return classifyOffsetForToken(parsed, params.botToken); +} + export async function readTelegramUpdateOffset(params: { accountId?: string; botToken?: string; @@ -121,52 +218,19 @@ export async function readTelegramUpdateOffset(params: { */ onRotationDetected?: (info: TelegramUpdateOffsetRotationInfo) => void; }): Promise { - const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); - const { value } = await readJsonFileWithFallback(filePath, null); - const parsed = safeParseState(value); - if (!parsed) { + const result = await inspectTelegramUpdateOffset({ + ...(params.accountId !== undefined ? { accountId: params.accountId } : {}), + ...(params.botToken !== undefined ? { botToken: params.botToken } : {}), + ...(params.env !== undefined ? { env: params.env } : {}), + }); + if (result.kind === "rotated") { + params.onRotationDetected?.(result.rotation); return null; } - const expectedBotId = extractBotIdFromToken(params.botToken); - const expectedFingerprint = fingerprintFromToken(params.botToken); - - const reportRotation = (reason: TelegramUpdateOffsetRotationInfo["reason"]) => { - if (parsed.lastUpdateId !== null && expectedBotId) { - params.onRotationDetected?.({ - reason, - previousBotId: parsed.botId, - currentBotId: expectedBotId, - staleLastUpdateId: parsed.lastUpdateId, - }); - } - }; - - // Different bot entirely (different bot id in the token). - if (expectedBotId && parsed.botId && parsed.botId !== expectedBotId) { - reportRotation("bot-id-changed"); - return null; + if (result.kind === "valid") { + return result.lastUpdateId; } - - // Legacy file from before per-bot scoping; cannot verify identity. - if (expectedBotId && parsed.botId === null) { - reportRotation("legacy-state"); - return null; - } - - // Same bot id, but the token itself changed (e.g. BotFather /revoke). - // Without a fingerprint match we cannot trust the persisted offset, since - // a rotated token may start a fresh update_id sequence and lower IDs would - // otherwise be silently skipped by the in-process update tracker. - if ( - expectedFingerprint && - parsed.tokenFingerprint && - parsed.tokenFingerprint !== expectedFingerprint - ) { - reportRotation("token-rotated"); - return null; - } - - return parsed.lastUpdateId ?? null; + return null; } export async function writeTelegramUpdateOffset(params: {