Files
openclaw/extensions/telegram/src/monitor.ts
scoootscooob e5bca0832f refactor: move Telegram channel implementation to extensions/ (#45635)
* refactor: move Telegram channel implementation to extensions/telegram/src/

Move all Telegram channel code (123 files + 10 bot/ files + 8 channel plugin
files) from src/telegram/ and src/channels/plugins/*/telegram.ts to
extensions/telegram/src/. Leave thin re-export shims at original locations so
cross-cutting src/ imports continue to resolve.

- Fix all relative import paths in moved files (../X/ -> ../../../src/X/)
- Fix vi.mock paths in 60 test files
- Fix inline typeof import() expressions
- Update tsconfig.plugin-sdk.dts.json rootDir to "." for cross-directory DTS
- Update write-plugin-sdk-entry-dts.ts for new rootDir structure
- Move channel plugin files with correct path remapping

* fix: support keyed telegram send deps

* fix: sync telegram extension copies with latest main

* fix: correct import paths and remove misplaced files in telegram extension

* fix: sync outbound-adapter with main (add sendTelegramPayloadMessages) and fix delivery.test import path
2026-03-14 02:50:17 -07:00

199 lines
6.7 KiB
TypeScript

import type { RunOptions } from "@grammyjs/runner";
import { resolveAgentMaxConcurrent } from "../../../src/config/agent-limits.js";
import type { OpenClawConfig } from "../../../src/config/config.js";
import { loadConfig } from "../../../src/config/config.js";
import { waitForAbortSignal } from "../../../src/infra/abort-signal.js";
import { formatErrorMessage } from "../../../src/infra/errors.js";
import { registerUnhandledRejectionHandler } from "../../../src/infra/unhandled-rejections.js";
import type { RuntimeEnv } from "../../../src/runtime.js";
import { resolveTelegramAccount } from "./accounts.js";
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { TelegramExecApprovalHandler } from "./exec-approvals-handler.js";
import {
isRecoverableTelegramNetworkError,
isTelegramPollingNetworkError,
} from "./network-errors.js";
import { TelegramPollingSession } from "./polling-session.js";
import { makeProxyFetch } from "./proxy.js";
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
import { startTelegramWebhook } from "./webhook.js";
export type MonitorTelegramOpts = {
token?: string;
accountId?: string;
config?: OpenClawConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
useWebhook?: boolean;
webhookPath?: string;
webhookPort?: number;
webhookSecret?: string;
webhookHost?: string;
proxyFetch?: typeof fetch;
webhookUrl?: string;
webhookCertPath?: string;
};
export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions<unknown> {
return {
sink: {
concurrency: resolveAgentMaxConcurrent(cfg),
},
runner: {
fetch: {
// Match grammY defaults
timeout: 30,
// Request reactions without dropping default update types.
allowed_updates: resolveTelegramAllowedUpdates(),
},
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
silent: true,
// Keep grammY retrying for a long outage window. If polling still
// stops, the outer monitor loop restarts it with backoff.
maxRetryTime: 60 * 60 * 1000,
retryInterval: "exponential",
},
};
}
function normalizePersistedUpdateId(value: number | null): number | null {
if (value === null) {
return null;
}
if (!Number.isSafeInteger(value) || value < 0) {
return null;
}
return value;
}
/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
const isGrammyHttpError = (err: unknown): boolean => {
if (!err || typeof err !== "object") {
return false;
}
return (err as { name?: string }).name === "HttpError";
};
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const log = opts.runtime?.error ?? console.error;
let pollingSession: TelegramPollingSession | undefined;
let execApprovalsHandler: TelegramExecApprovalHandler | undefined;
const unregisterHandler = registerUnhandledRejectionHandler((err) => {
const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" });
const isTelegramPollingError = isTelegramPollingNetworkError(err);
if (isGrammyHttpError(err) && isNetworkError && isTelegramPollingError) {
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
return true;
}
const activeRunner = pollingSession?.activeRunner;
if (isNetworkError && isTelegramPollingError && activeRunner && activeRunner.isRunning()) {
pollingSession?.markForceRestarted();
pollingSession?.abortActiveFetch();
void activeRunner.stop().catch(() => {});
log(
`[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`,
);
return true;
}
return false;
});
try {
const cfg = opts.config ?? loadConfig();
const account = resolveTelegramAccount({
cfg,
accountId: opts.accountId,
});
const token = opts.token?.trim() || account.token;
if (!token) {
throw new Error(
`Telegram bot token missing for account "${account.accountId}" (set channels.telegram.accounts.${account.accountId}.botToken/tokenFile or TELEGRAM_BOT_TOKEN for default).`,
);
}
const proxyFetch =
opts.proxyFetch ?? (account.config.proxy ? makeProxyFetch(account.config.proxy) : undefined);
execApprovalsHandler = new TelegramExecApprovalHandler({
token,
accountId: account.accountId,
cfg,
runtime: opts.runtime,
});
await execApprovalsHandler.start();
const persistedOffsetRaw = await readTelegramUpdateOffset({
accountId: account.accountId,
botToken: token,
});
let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw);
if (persistedOffsetRaw !== null && lastUpdateId === null) {
log(
`[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`,
);
}
const persistUpdateId = async (updateId: number) => {
const normalizedUpdateId = normalizePersistedUpdateId(updateId);
if (normalizedUpdateId === null) {
log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`);
return;
}
if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) {
return;
}
lastUpdateId = normalizedUpdateId;
try {
await writeTelegramUpdateOffset({
accountId: account.accountId,
updateId: normalizedUpdateId,
botToken: token,
});
} catch (err) {
(opts.runtime?.error ?? console.error)(
`telegram: failed to persist update offset: ${String(err)}`,
);
}
};
if (opts.useWebhook) {
await startTelegramWebhook({
token,
accountId: account.accountId,
config: cfg,
path: opts.webhookPath,
port: opts.webhookPort,
secret: opts.webhookSecret ?? account.config.webhookSecret,
host: opts.webhookHost ?? account.config.webhookHost,
runtime: opts.runtime as RuntimeEnv,
fetch: proxyFetch,
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
webhookCertPath: opts.webhookCertPath,
});
await waitForAbortSignal(opts.abortSignal);
return;
}
pollingSession = new TelegramPollingSession({
token,
config: cfg,
accountId: account.accountId,
runtime: opts.runtime,
proxyFetch,
abortSignal: opts.abortSignal,
runnerOptions: createTelegramRunnerOptions(cfg),
getLastUpdateId: () => lastUpdateId,
persistUpdateId,
log,
});
await pollingSession.runUntilAbort();
} finally {
await execApprovalsHandler?.stop().catch(() => {});
unregisterHandler();
}
}