From 76cd97289b790fdbb49792c65ff48b01384872ec Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 28 Apr 2026 01:50:44 -0700 Subject: [PATCH] fix(cron): support Telegram thread IDs in cron add/edit - Add `--thread-id` support to cron add/edit Telegram delivery. - Reject non-positive thread IDs and guard cron edit lookup pagination against non-progress/max-page loops. - Preserve existing delivery mode on thread-only cron edit patches. Carries forward #51581, #60373, and #60890. Co-authored-by: ChunHao Chen --- CHANGELOG.md | 1 + docs/cli/cron.md | 6 + src/cli/cron-cli.test.ts | 224 ++++++++++++++++++++++++- src/cli/cron-cli/register.cron-add.ts | 18 +- src/cli/cron-cli/register.cron-edit.ts | 62 ++++++- src/cli/cron-cli/thread-id-shared.ts | 35 ++++ 6 files changed, 334 insertions(+), 12 deletions(-) create mode 100644 src/cli/cron-cli/thread-id-shared.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e01dfe8f02..3798d71abd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai - Plugins/startup: precompute bundled runtime mirror fingerprints before taking the mirror lock and keep Docker bundled plugin runtime deps/mirrors in a Docker-managed volume instead of the Windows/WSL config bind mount, so cold starts avoid slow host-volume mirror writes. Fixes #73339. Thanks @1yihui. - Channels/LINE: persist inbound image, video, audio, and file downloads in `~/.openclaw/media/inbound/` instead of temporary files so agents can still read LINE media after `/tmp` cleanup. Fixes #73370. Thanks @hijirii and @wenxu007. - CLI/plugins: keep bundled plugin installs out of `plugins.load.paths` while preserving install records, so install/inspect/doctor loops no longer warn about the current bundled plugin directory. Thanks @vincentkoc. +- Cron/Telegram: add `--thread-id` to `openclaw cron add` and `openclaw cron edit`, preserving Telegram forum topic delivery targets across scheduled announcements. Carries forward #51581, #60373, and #60890. Thanks @ChunHao-dev. - Control UI/WebChat: keep large attachment payloads out of Lit state and optimistic chat messages, using object URL previews plus send-time payload serialization so PDF/image uploads no longer trigger `RangeError: Maximum call stack size exceeded`. Fixes #73360; refs #54378 and #63432. Thanks @hejunhui-73, @Ansub, and @christianhernandez3-afk. - Agents/Anthropic: cancel stalled Anthropic Messages SSE body reads when abort signals fire, so active-memory timeouts release transport resources instead of leaving hidden recall runs parked on `reader.read()`. Refs #72965 and #73120. Thanks @wdeveloper16. - Control UI/WebChat: keep pending run and typing state attached to the active client run, so unowned inject/announce/side-result finals no longer unlock unrelated active runs while completed owned runs still clear promptly. Fixes #57795; carries forward the narrow diagnosis from #57887. Thanks @haoyu-haoyu. diff --git a/docs/cli/cron.md b/docs/cli/cron.md index a8c56babaac..feae8e3c03e 100644 --- a/docs/cli/cron.md +++ b/docs/cli/cron.md @@ -183,6 +183,12 @@ Announce to a specific channel: openclaw cron edit --announce --channel slack --to "channel:C1234567890" ``` +Announce to a Telegram forum topic: + +```bash +openclaw cron edit --announce --channel telegram --to "-1001234567890" --thread-id 42 +``` + Create an isolated job with lightweight bootstrap context: ```bash diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 263dd917fa1..111bcda04ea 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -67,6 +67,7 @@ type CronUpdatePatch = { mode?: string; channel?: string; to?: string; + threadId?: number; accountId?: string; bestEffort?: boolean; }; @@ -81,7 +82,13 @@ type CronAddParams = { lightContext?: boolean; toolsAllow?: string[]; }; - delivery?: { mode?: string; accountId?: string }; + delivery?: { + mode?: string; + channel?: string; + to?: string; + threadId?: number; + accountId?: string; + }; deleteAfterRun?: boolean; agentId?: string; sessionTarget?: string; @@ -379,6 +386,32 @@ describe("cron cli", () => { expect(params?.delivery?.accountId).toBe("coordinator"); }); + it("includes --thread-id on Telegram cron add delivery", async () => { + const params = await runCronAddAndGetParams([ + "--name", + "telegram topic add", + "--cron", + "* * * * *", + "--session", + "SESSION:agent:ops:telegram:group:-100123:topic:42", + "--message", + "hello", + "--deliver", + "--channel", + "telegram", + "--to", + "-100123", + "--thread-id", + " 42 ", + ]); + + expect(params?.sessionTarget).toBe("session:agent:ops:telegram:group:-100123:topic:42"); + expect(params?.delivery?.mode).toBe("announce"); + expect(params?.delivery?.channel).toBe("telegram"); + expect(params?.delivery?.to).toBe("-100123"); + expect(params?.delivery?.threadId).toBe(42); + }); + it("rejects --account on non-isolated/systemEvent cron add", async () => { await expectCronCommandExit([ "cron", @@ -396,6 +429,40 @@ describe("cron cli", () => { ]); }); + it("rejects invalid --thread-id on cron add", async () => { + await expectCronCommandExit([ + "cron", + "add", + "--name", + "invalid topic add", + "--cron", + "* * * * *", + "--session", + "isolated", + "--message", + "hello", + "--thread-id", + "topic-42", + ]); + }); + + it("rejects negative --thread-id on cron add", async () => { + await expectCronCommandExit([ + "cron", + "add", + "--name", + "invalid negative topic add", + "--cron", + "* * * * *", + "--session", + "isolated", + "--message", + "hello", + "--thread-id", + "-5", + ]); + }); + it.each([ { command: "enable" as const, expectedEnabled: true }, { command: "disable" as const, expectedEnabled: false }, @@ -593,6 +660,47 @@ describe("cron cli", () => { expect(patch?.patch?.payload?.message).toBeUndefined(); }); + it("updates Telegram thread id without requiring --message on cron edit", async () => { + const patch = await runCronEditAndGetPatch([ + "--deliver", + "--channel", + "telegram", + "--to", + "-100123", + "--thread-id", + "42", + ]); + + expect(patch?.patch?.payload?.kind).toBe("agentTurn"); + expect(patch?.patch?.delivery?.mode).toBe("announce"); + expect(patch?.patch?.delivery?.channel).toBe("telegram"); + expect(patch?.patch?.delivery?.to).toBe("-100123"); + expect(patch?.patch?.delivery?.threadId).toBe(42); + }); + + it("preserves existing delivery mode on thread-only cron edit patches", async () => { + const patch = await runCronEditAndGetPatch(["--thread-id", "42"]); + + expect(patch?.patch?.payload?.kind).toBe("agentTurn"); + expect(patch?.patch?.delivery?.mode).toBeUndefined(); + expect(patch?.patch?.delivery?.threadId).toBe(42); + }); + + it("normalizes case-insensitive custom session targets on cron edit", async () => { + await runCronCommand(["cron", "edit", "job-1", "--session", "SESSION:Project-Alpha"]); + + const patch = getGatewayCallParams<{ patch?: { sessionTarget?: string } }>("cron.update"); + expect(patch?.patch?.sessionTarget).toBe("session:Project-Alpha"); + }); + + it("rejects invalid --thread-id on cron edit", async () => { + await expectCronCommandExit(["cron", "edit", "job-1", "--thread-id", "topic-42"]); + }); + + it("rejects negative --thread-id on cron edit", async () => { + await expectCronCommandExit(["cron", "edit", "job-1", "--thread-id", "-5"]); + }); + it("supports --no-deliver on cron edit", async () => { await runCronCommand(["cron", "edit", "job-1", "--no-deliver"]); @@ -857,6 +965,120 @@ describe("cron cli", () => { }); }); + it("paginates cron edit existing-job schedule lookups", async () => { + resetGatewayMock(); + callGatewayFromCli.mockImplementation( + async (method: string, _opts: unknown, params?: unknown) => { + if (method === "cron.status") { + return { enabled: true }; + } + if (method === "cron.list") { + const offset = (params as { offset?: number }).offset ?? 0; + if (offset === 0) { + return { + jobs: [ + { + ...createCronJob("first-page", "First Page"), + schedule: { kind: "cron", expr: "0 * * * *" }, + }, + ], + hasMore: true, + nextOffset: 200, + }; + } + return { + jobs: [ + { + ...createCronJob("job-1", "Target Job"), + schedule: { kind: "cron", expr: "0 */2 * * *", staggerMs: 300_000 }, + }, + ], + hasMore: false, + nextOffset: null, + }; + } + return { ok: true, params }; + }, + ); + + const program = buildProgram(); + await program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }); + + const listParams = callGatewayFromCli.mock.calls + .filter((call) => call[0] === "cron.list") + .map((call) => call[2]); + expect(listParams).toEqual([ + { includeDisabled: true, limit: 200, offset: 0 }, + { includeDisabled: true, limit: 200, offset: 200 }, + ]); + + const patch = getGatewayCallParams("cron.update"); + expect(patch?.patch?.schedule).toEqual({ + kind: "cron", + expr: "0 */2 * * *", + staggerMs: 0, + }); + }); + + it("rejects non-advancing cron edit lookup pagination", async () => { + resetGatewayMock(); + callGatewayFromCli.mockImplementation( + async (method: string, _opts: unknown, params?: unknown) => { + if (method === "cron.status") { + return { enabled: true }; + } + if (method === "cron.list") { + return { + jobs: [], + hasMore: true, + nextOffset: (params as { offset?: number }).offset ?? 0, + }; + } + return { ok: true, params }; + }, + ); + + const program = buildProgram(); + await expect( + program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }), + ).rejects.toThrow("__exit__:1"); + + expect(defaultRuntime.error).toHaveBeenCalledWith( + expect.stringContaining("cron.list pagination did not advance"), + ); + }); + + it("rejects excessive cron edit lookup pagination", async () => { + resetGatewayMock(); + callGatewayFromCli.mockImplementation( + async (method: string, _opts: unknown, params?: unknown) => { + if (method === "cron.status") { + return { enabled: true }; + } + if (method === "cron.list") { + const offset = (params as { offset?: number }).offset ?? 0; + return { + jobs: [], + hasMore: true, + nextOffset: offset + 200, + }; + } + return { ok: true, params }; + }, + ); + + const program = buildProgram(); + await expect( + program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }), + ).rejects.toThrow("__exit__:1"); + + const listCalls = callGatewayFromCli.mock.calls.filter((call) => call[0] === "cron.list"); + expect(listCalls).toHaveLength(50); + expect(defaultRuntime.error).toHaveBeenCalledWith( + expect.stringContaining("cron.list pagination exceeded maximum pages"), + ); + }); + it("rejects --exact on edit when existing job is not cron", async () => { await expectCronEditWithScheduleLookupExit({ kind: "every", everyMs: 60_000 }, ["--exact"]); }); diff --git a/src/cli/cron-cli/register.cron-add.ts b/src/cli/cron-cli/register.cron-add.ts index 5e7c0ce96bb..0381bffdb29 100644 --- a/src/cli/cron-cli/register.cron-add.ts +++ b/src/cli/cron-cli/register.cron-add.ts @@ -19,6 +19,7 @@ import { printCronList, warnIfCronSchedulerDisabled, } from "./shared.js"; +import { normalizeCronSessionTargetOption, parseCronThreadIdOption } from "./thread-id-shared.js"; export function registerCronStatusCommand(cron: Command) { addGatewayClientOptions( @@ -105,6 +106,7 @@ export function registerCronAddCommand(cron: Command) { "--to ", "Delivery destination (E.164, Telegram chatId, or Discord channel/user)", ) + .option("--thread-id ", "Telegram forum topic thread id") .option("--account ", "Channel account id for delivery (multi-account setups)") .option("--best-effort-deliver", "Do not fail the job if delivery fails", false) .option("--json", "Output JSON", false) @@ -165,7 +167,9 @@ export function registerCronAddCommand(cron: Command) { const sessionTargetRaw = normalizeOptionalString(opts.session) ?? ""; const inferredSessionTarget = payload.kind === "agentTurn" ? "isolated" : "main"; const sessionTarget = - sessionSource === "cli" ? sessionTargetRaw || "" : inferredSessionTarget; + sessionSource === "cli" + ? normalizeCronSessionTargetOption(sessionTargetRaw) || "" + : inferredSessionTarget; const isCustomSessionTarget = normalizeLowercaseStringOrEmpty(sessionTarget).startsWith("session:") && Boolean(normalizeOptionalString(sessionTarget.slice(8))); @@ -193,9 +197,16 @@ export function registerCronAddCommand(cron: Command) { } const accountId = normalizeOptionalString(opts.account); + const threadId = parseCronThreadIdOption(opts.threadId); + const hasThreadId = typeof threadId === "number"; - if (accountId && (!isIsolatedLikeSessionTarget || payload.kind !== "agentTurn")) { - throw new Error("--account requires a non-main agentTurn job with delivery."); + if ( + (accountId || hasThreadId) && + (!isIsolatedLikeSessionTarget || payload.kind !== "agentTurn") + ) { + throw new Error( + "--account and --thread-id require a non-main agentTurn job with delivery.", + ); } const deliveryMode = @@ -232,6 +243,7 @@ export function registerCronAddCommand(cron: Command) { mode: deliveryMode, channel: normalizeOptionalString(opts.channel), to: normalizeOptionalString(opts.to), + threadId, accountId, bestEffort: opts.bestEffortDeliver ? true : undefined, } diff --git a/src/cli/cron-cli/register.cron-edit.ts b/src/cli/cron-cli/register.cron-edit.ts index f24691025d9..df6fcaf77d6 100644 --- a/src/cli/cron-cli/register.cron-edit.ts +++ b/src/cli/cron-cli/register.cron-edit.ts @@ -18,6 +18,10 @@ import { parseDurationMs, warnIfCronSchedulerDisabled, } from "./shared.js"; +import { normalizeCronSessionTargetOption, parseCronThreadIdOption } from "./thread-id-shared.js"; + +const CRON_EDIT_LOOKUP_PAGE_SIZE = 200; +const CRON_EDIT_LOOKUP_MAX_PAGES = 50; const assignIf = ( target: Record, @@ -30,6 +34,32 @@ const assignIf = ( } }; +async function loadCronJobForEditSchedulePatch( + opts: Record, + id: string, +): Promise { + let offset = 0; + for (let page = 0; page < CRON_EDIT_LOOKUP_MAX_PAGES; page += 1) { + const listed = (await callGatewayFromCli("cron.list", opts, { + includeDisabled: true, + limit: CRON_EDIT_LOOKUP_PAGE_SIZE, + offset, + })) as { jobs?: CronJob[]; hasMore?: boolean; nextOffset?: number | null } | null; + const existing = (listed?.jobs ?? []).find((job) => job.id === id); + if (existing) { + return existing; + } + if (!listed?.hasMore || typeof listed.nextOffset !== "number") { + return undefined; + } + if (listed.nextOffset <= offset) { + throw new Error("cron.list pagination did not advance while looking up cron job"); + } + offset = listed.nextOffset; + } + throw new Error("cron.list pagination exceeded maximum pages while looking up cron job"); +} + export function registerCronEditCommand(cron: Command) { addGatewayClientOptions( cron @@ -74,6 +104,7 @@ export function registerCronEditCommand(cron: Command) { "--to ", "Delivery destination (E.164, Telegram chatId, or Discord channel/user)", ) + .option("--thread-id ", "Telegram forum topic thread id") .option("--account ", "Channel account id for delivery (multi-account setups)") .option("--best-effort-deliver", "Do not fail job if delivery fails") .option("--no-best-effort-deliver", "Fail job when delivery fails") @@ -95,12 +126,24 @@ export function registerCronEditCommand(cron: Command) { ) .action(async (id, opts) => { try { - if (opts.session === "main" && opts.message) { + const sessionTarget = + typeof opts.session === "string" + ? normalizeCronSessionTargetOption(opts.session) + : undefined; + if (typeof opts.session === "string" && !sessionTarget) { + throw new Error("--session must be main, isolated, current, or session:"); + } + if (sessionTarget === "main" && opts.message) { throw new Error( "Main jobs cannot use --message; use --system-event or --session isolated.", ); } - if (opts.session === "isolated" && opts.systemEvent) { + if ( + (sessionTarget === "isolated" || + sessionTarget === "current" || + sessionTarget?.startsWith("session:")) && + opts.systemEvent + ) { throw new Error( "Isolated jobs cannot use --system-event; use --message or --session main.", ); @@ -134,7 +177,7 @@ export function registerCronEditCommand(cron: Command) { patch.deleteAfterRun = false; } if (typeof opts.session === "string") { - patch.sessionTarget = opts.session; + patch.sessionTarget = sessionTarget; } if (typeof opts.wake === "string") { patch.wakeMode = opts.wake; @@ -169,10 +212,7 @@ export function registerCronEditCommand(cron: Command) { if (scheduleRequest.kind === "direct") { patch.schedule = scheduleRequest.schedule; } else if (scheduleRequest.kind === "patch-existing-cron") { - const listed = (await callGatewayFromCli("cron.list", opts, { - includeDisabled: true, - })) as { jobs?: CronJob[] } | null; - const existing = (listed?.jobs ?? []).find((job) => job.id === id); + const existing = await loadCronJobForEditSchedulePatch(opts, String(id)); if (!existing) { throw new Error(`unknown cron job id: ${id}`); } @@ -188,7 +228,10 @@ export function registerCronEditCommand(cron: Command) { : undefined; const hasTimeoutSeconds = Boolean(timeoutSeconds && Number.isFinite(timeoutSeconds)); const hasDeliveryModeFlag = opts.announce || typeof opts.deliver === "boolean"; - const hasDeliveryTarget = typeof opts.channel === "string" || typeof opts.to === "string"; + const threadId = parseCronThreadIdOption(opts.threadId); + const hasDeliveryThreadId = typeof threadId === "number"; + const hasDeliveryTarget = + typeof opts.channel === "string" || typeof opts.to === "string" || hasDeliveryThreadId; const hasDeliveryAccount = typeof opts.account === "string"; const hasBestEffort = typeof opts.bestEffortDeliver === "boolean"; const hasAgentTurnPatch = @@ -248,6 +291,9 @@ export function registerCronEditCommand(cron: Command) { const to = opts.to.trim(); delivery.to = to ? to : undefined; } + if (hasDeliveryThreadId) { + delivery.threadId = threadId; + } if (typeof opts.account === "string") { const account = opts.account.trim(); delivery.accountId = account ? account : undefined; diff --git a/src/cli/cron-cli/thread-id-shared.ts b/src/cli/cron-cli/thread-id-shared.ts new file mode 100644 index 00000000000..c94b77469ad --- /dev/null +++ b/src/cli/cron-cli/thread-id-shared.ts @@ -0,0 +1,35 @@ +import { + normalizeLowercaseStringOrEmpty, + normalizeOptionalString, +} from "../../shared/string-coerce.js"; + +export function parseCronThreadIdOption(value: unknown): number | undefined { + const raw = normalizeOptionalString(value); + if (!raw) { + return undefined; + } + if (!/^\d+$/.test(raw)) { + throw new Error("--thread-id must be a positive integer Telegram topic thread id"); + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isSafeInteger(parsed) || parsed <= 0) { + throw new Error("--thread-id must be a safe positive integer Telegram topic thread id"); + } + return parsed; +} + +export function normalizeCronSessionTargetOption(value: unknown): string | undefined { + const raw = normalizeOptionalString(value); + if (!raw) { + return undefined; + } + const lower = normalizeLowercaseStringOrEmpty(raw); + if (lower === "main" || lower === "isolated" || lower === "current") { + return lower; + } + if (lower.startsWith("session:")) { + const id = normalizeOptionalString(raw.slice(8)); + return id ? `session:${id}` : undefined; + } + return undefined; +}