fix(cron): support Telegram thread IDs in cron add/edit

- Add `--thread-id` support to cron add/edit Telegram delivery.
- Reject non-positive thread IDs and guard cron edit lookup pagination against non-progress/max-page loops.
- Preserve existing delivery mode on thread-only cron edit patches.

Carries forward #51581, #60373, and #60890.

Co-authored-by: ChunHao Chen <crazycjh@gmail.com>
This commit is contained in:
Vincent Koc
2026-04-28 01:50:44 -07:00
committed by GitHub
parent 02908db62b
commit 76cd97289b
6 changed files with 334 additions and 12 deletions

View File

@@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai
- Plugins/startup: precompute bundled runtime mirror fingerprints before taking the mirror lock and keep Docker bundled plugin runtime deps/mirrors in a Docker-managed volume instead of the Windows/WSL config bind mount, so cold starts avoid slow host-volume mirror writes. Fixes #73339. Thanks @1yihui.
- Channels/LINE: persist inbound image, video, audio, and file downloads in `~/.openclaw/media/inbound/` instead of temporary files so agents can still read LINE media after `/tmp` cleanup. Fixes #73370. Thanks @hijirii and @wenxu007.
- CLI/plugins: keep bundled plugin installs out of `plugins.load.paths` while preserving install records, so install/inspect/doctor loops no longer warn about the current bundled plugin directory. Thanks @vincentkoc.
- Cron/Telegram: add `--thread-id` to `openclaw cron add` and `openclaw cron edit`, preserving Telegram forum topic delivery targets across scheduled announcements. Carries forward #51581, #60373, and #60890. Thanks @ChunHao-dev.
- Control UI/WebChat: keep large attachment payloads out of Lit state and optimistic chat messages, using object URL previews plus send-time payload serialization so PDF/image uploads no longer trigger `RangeError: Maximum call stack size exceeded`. Fixes #73360; refs #54378 and #63432. Thanks @hejunhui-73, @Ansub, and @christianhernandez3-afk.
- Agents/Anthropic: cancel stalled Anthropic Messages SSE body reads when abort signals fire, so active-memory timeouts release transport resources instead of leaving hidden recall runs parked on `reader.read()`. Refs #72965 and #73120. Thanks @wdeveloper16.
- Control UI/WebChat: keep pending run and typing state attached to the active client run, so unowned inject/announce/side-result finals no longer unlock unrelated active runs while completed owned runs still clear promptly. Fixes #57795; carries forward the narrow diagnosis from #57887. Thanks @haoyu-haoyu.

View File

@@ -183,6 +183,12 @@ Announce to a specific channel:
openclaw cron edit <job-id> --announce --channel slack --to "channel:C1234567890"
```
Announce to a Telegram forum topic:
```bash
openclaw cron edit <job-id> --announce --channel telegram --to "-1001234567890" --thread-id 42
```
Create an isolated job with lightweight bootstrap context:
```bash

View File

@@ -67,6 +67,7 @@ type CronUpdatePatch = {
mode?: string;
channel?: string;
to?: string;
threadId?: number;
accountId?: string;
bestEffort?: boolean;
};
@@ -81,7 +82,13 @@ type CronAddParams = {
lightContext?: boolean;
toolsAllow?: string[];
};
delivery?: { mode?: string; accountId?: string };
delivery?: {
mode?: string;
channel?: string;
to?: string;
threadId?: number;
accountId?: string;
};
deleteAfterRun?: boolean;
agentId?: string;
sessionTarget?: string;
@@ -379,6 +386,32 @@ describe("cron cli", () => {
expect(params?.delivery?.accountId).toBe("coordinator");
});
it("includes --thread-id on Telegram cron add delivery", async () => {
const params = await runCronAddAndGetParams([
"--name",
"telegram topic add",
"--cron",
"* * * * *",
"--session",
"SESSION:agent:ops:telegram:group:-100123:topic:42",
"--message",
"hello",
"--deliver",
"--channel",
"telegram",
"--to",
"-100123",
"--thread-id",
" 42 ",
]);
expect(params?.sessionTarget).toBe("session:agent:ops:telegram:group:-100123:topic:42");
expect(params?.delivery?.mode).toBe("announce");
expect(params?.delivery?.channel).toBe("telegram");
expect(params?.delivery?.to).toBe("-100123");
expect(params?.delivery?.threadId).toBe(42);
});
it("rejects --account on non-isolated/systemEvent cron add", async () => {
await expectCronCommandExit([
"cron",
@@ -396,6 +429,40 @@ describe("cron cli", () => {
]);
});
it("rejects invalid --thread-id on cron add", async () => {
await expectCronCommandExit([
"cron",
"add",
"--name",
"invalid topic add",
"--cron",
"* * * * *",
"--session",
"isolated",
"--message",
"hello",
"--thread-id",
"topic-42",
]);
});
it("rejects negative --thread-id on cron add", async () => {
await expectCronCommandExit([
"cron",
"add",
"--name",
"invalid negative topic add",
"--cron",
"* * * * *",
"--session",
"isolated",
"--message",
"hello",
"--thread-id",
"-5",
]);
});
it.each([
{ command: "enable" as const, expectedEnabled: true },
{ command: "disable" as const, expectedEnabled: false },
@@ -593,6 +660,47 @@ describe("cron cli", () => {
expect(patch?.patch?.payload?.message).toBeUndefined();
});
it("updates Telegram thread id without requiring --message on cron edit", async () => {
const patch = await runCronEditAndGetPatch([
"--deliver",
"--channel",
"telegram",
"--to",
"-100123",
"--thread-id",
"42",
]);
expect(patch?.patch?.payload?.kind).toBe("agentTurn");
expect(patch?.patch?.delivery?.mode).toBe("announce");
expect(patch?.patch?.delivery?.channel).toBe("telegram");
expect(patch?.patch?.delivery?.to).toBe("-100123");
expect(patch?.patch?.delivery?.threadId).toBe(42);
});
it("preserves existing delivery mode on thread-only cron edit patches", async () => {
const patch = await runCronEditAndGetPatch(["--thread-id", "42"]);
expect(patch?.patch?.payload?.kind).toBe("agentTurn");
expect(patch?.patch?.delivery?.mode).toBeUndefined();
expect(patch?.patch?.delivery?.threadId).toBe(42);
});
it("normalizes case-insensitive custom session targets on cron edit", async () => {
await runCronCommand(["cron", "edit", "job-1", "--session", "SESSION:Project-Alpha"]);
const patch = getGatewayCallParams<{ patch?: { sessionTarget?: string } }>("cron.update");
expect(patch?.patch?.sessionTarget).toBe("session:Project-Alpha");
});
it("rejects invalid --thread-id on cron edit", async () => {
await expectCronCommandExit(["cron", "edit", "job-1", "--thread-id", "topic-42"]);
});
it("rejects negative --thread-id on cron edit", async () => {
await expectCronCommandExit(["cron", "edit", "job-1", "--thread-id", "-5"]);
});
it("supports --no-deliver on cron edit", async () => {
await runCronCommand(["cron", "edit", "job-1", "--no-deliver"]);
@@ -857,6 +965,120 @@ describe("cron cli", () => {
});
});
it("paginates cron edit existing-job schedule lookups", async () => {
resetGatewayMock();
callGatewayFromCli.mockImplementation(
async (method: string, _opts: unknown, params?: unknown) => {
if (method === "cron.status") {
return { enabled: true };
}
if (method === "cron.list") {
const offset = (params as { offset?: number }).offset ?? 0;
if (offset === 0) {
return {
jobs: [
{
...createCronJob("first-page", "First Page"),
schedule: { kind: "cron", expr: "0 * * * *" },
},
],
hasMore: true,
nextOffset: 200,
};
}
return {
jobs: [
{
...createCronJob("job-1", "Target Job"),
schedule: { kind: "cron", expr: "0 */2 * * *", staggerMs: 300_000 },
},
],
hasMore: false,
nextOffset: null,
};
}
return { ok: true, params };
},
);
const program = buildProgram();
await program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" });
const listParams = callGatewayFromCli.mock.calls
.filter((call) => call[0] === "cron.list")
.map((call) => call[2]);
expect(listParams).toEqual([
{ includeDisabled: true, limit: 200, offset: 0 },
{ includeDisabled: true, limit: 200, offset: 200 },
]);
const patch = getGatewayCallParams<CronUpdatePatch>("cron.update");
expect(patch?.patch?.schedule).toEqual({
kind: "cron",
expr: "0 */2 * * *",
staggerMs: 0,
});
});
it("rejects non-advancing cron edit lookup pagination", async () => {
resetGatewayMock();
callGatewayFromCli.mockImplementation(
async (method: string, _opts: unknown, params?: unknown) => {
if (method === "cron.status") {
return { enabled: true };
}
if (method === "cron.list") {
return {
jobs: [],
hasMore: true,
nextOffset: (params as { offset?: number }).offset ?? 0,
};
}
return { ok: true, params };
},
);
const program = buildProgram();
await expect(
program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }),
).rejects.toThrow("__exit__:1");
expect(defaultRuntime.error).toHaveBeenCalledWith(
expect.stringContaining("cron.list pagination did not advance"),
);
});
it("rejects excessive cron edit lookup pagination", async () => {
resetGatewayMock();
callGatewayFromCli.mockImplementation(
async (method: string, _opts: unknown, params?: unknown) => {
if (method === "cron.status") {
return { enabled: true };
}
if (method === "cron.list") {
const offset = (params as { offset?: number }).offset ?? 0;
return {
jobs: [],
hasMore: true,
nextOffset: offset + 200,
};
}
return { ok: true, params };
},
);
const program = buildProgram();
await expect(
program.parseAsync(["cron", "edit", "job-1", "--exact"], { from: "user" }),
).rejects.toThrow("__exit__:1");
const listCalls = callGatewayFromCli.mock.calls.filter((call) => call[0] === "cron.list");
expect(listCalls).toHaveLength(50);
expect(defaultRuntime.error).toHaveBeenCalledWith(
expect.stringContaining("cron.list pagination exceeded maximum pages"),
);
});
it("rejects --exact on edit when existing job is not cron", async () => {
await expectCronEditWithScheduleLookupExit({ kind: "every", everyMs: 60_000 }, ["--exact"]);
});

View File

@@ -19,6 +19,7 @@ import {
printCronList,
warnIfCronSchedulerDisabled,
} from "./shared.js";
import { normalizeCronSessionTargetOption, parseCronThreadIdOption } from "./thread-id-shared.js";
export function registerCronStatusCommand(cron: Command) {
addGatewayClientOptions(
@@ -105,6 +106,7 @@ export function registerCronAddCommand(cron: Command) {
"--to <dest>",
"Delivery destination (E.164, Telegram chatId, or Discord channel/user)",
)
.option("--thread-id <id>", "Telegram forum topic thread id")
.option("--account <id>", "Channel account id for delivery (multi-account setups)")
.option("--best-effort-deliver", "Do not fail the job if delivery fails", false)
.option("--json", "Output JSON", false)
@@ -165,7 +167,9 @@ export function registerCronAddCommand(cron: Command) {
const sessionTargetRaw = normalizeOptionalString(opts.session) ?? "";
const inferredSessionTarget = payload.kind === "agentTurn" ? "isolated" : "main";
const sessionTarget =
sessionSource === "cli" ? sessionTargetRaw || "" : inferredSessionTarget;
sessionSource === "cli"
? normalizeCronSessionTargetOption(sessionTargetRaw) || ""
: inferredSessionTarget;
const isCustomSessionTarget =
normalizeLowercaseStringOrEmpty(sessionTarget).startsWith("session:") &&
Boolean(normalizeOptionalString(sessionTarget.slice(8)));
@@ -193,9 +197,16 @@ export function registerCronAddCommand(cron: Command) {
}
const accountId = normalizeOptionalString(opts.account);
const threadId = parseCronThreadIdOption(opts.threadId);
const hasThreadId = typeof threadId === "number";
if (accountId && (!isIsolatedLikeSessionTarget || payload.kind !== "agentTurn")) {
throw new Error("--account requires a non-main agentTurn job with delivery.");
if (
(accountId || hasThreadId) &&
(!isIsolatedLikeSessionTarget || payload.kind !== "agentTurn")
) {
throw new Error(
"--account and --thread-id require a non-main agentTurn job with delivery.",
);
}
const deliveryMode =
@@ -232,6 +243,7 @@ export function registerCronAddCommand(cron: Command) {
mode: deliveryMode,
channel: normalizeOptionalString(opts.channel),
to: normalizeOptionalString(opts.to),
threadId,
accountId,
bestEffort: opts.bestEffortDeliver ? true : undefined,
}

View File

@@ -18,6 +18,10 @@ import {
parseDurationMs,
warnIfCronSchedulerDisabled,
} from "./shared.js";
import { normalizeCronSessionTargetOption, parseCronThreadIdOption } from "./thread-id-shared.js";
const CRON_EDIT_LOOKUP_PAGE_SIZE = 200;
const CRON_EDIT_LOOKUP_MAX_PAGES = 50;
const assignIf = (
target: Record<string, unknown>,
@@ -30,6 +34,32 @@ const assignIf = (
}
};
async function loadCronJobForEditSchedulePatch(
opts: Record<string, unknown>,
id: string,
): Promise<CronJob | undefined> {
let offset = 0;
for (let page = 0; page < CRON_EDIT_LOOKUP_MAX_PAGES; page += 1) {
const listed = (await callGatewayFromCli("cron.list", opts, {
includeDisabled: true,
limit: CRON_EDIT_LOOKUP_PAGE_SIZE,
offset,
})) as { jobs?: CronJob[]; hasMore?: boolean; nextOffset?: number | null } | null;
const existing = (listed?.jobs ?? []).find((job) => job.id === id);
if (existing) {
return existing;
}
if (!listed?.hasMore || typeof listed.nextOffset !== "number") {
return undefined;
}
if (listed.nextOffset <= offset) {
throw new Error("cron.list pagination did not advance while looking up cron job");
}
offset = listed.nextOffset;
}
throw new Error("cron.list pagination exceeded maximum pages while looking up cron job");
}
export function registerCronEditCommand(cron: Command) {
addGatewayClientOptions(
cron
@@ -74,6 +104,7 @@ export function registerCronEditCommand(cron: Command) {
"--to <dest>",
"Delivery destination (E.164, Telegram chatId, or Discord channel/user)",
)
.option("--thread-id <id>", "Telegram forum topic thread id")
.option("--account <id>", "Channel account id for delivery (multi-account setups)")
.option("--best-effort-deliver", "Do not fail job if delivery fails")
.option("--no-best-effort-deliver", "Fail job when delivery fails")
@@ -95,12 +126,24 @@ export function registerCronEditCommand(cron: Command) {
)
.action(async (id, opts) => {
try {
if (opts.session === "main" && opts.message) {
const sessionTarget =
typeof opts.session === "string"
? normalizeCronSessionTargetOption(opts.session)
: undefined;
if (typeof opts.session === "string" && !sessionTarget) {
throw new Error("--session must be main, isolated, current, or session:<id>");
}
if (sessionTarget === "main" && opts.message) {
throw new Error(
"Main jobs cannot use --message; use --system-event or --session isolated.",
);
}
if (opts.session === "isolated" && opts.systemEvent) {
if (
(sessionTarget === "isolated" ||
sessionTarget === "current" ||
sessionTarget?.startsWith("session:")) &&
opts.systemEvent
) {
throw new Error(
"Isolated jobs cannot use --system-event; use --message or --session main.",
);
@@ -134,7 +177,7 @@ export function registerCronEditCommand(cron: Command) {
patch.deleteAfterRun = false;
}
if (typeof opts.session === "string") {
patch.sessionTarget = opts.session;
patch.sessionTarget = sessionTarget;
}
if (typeof opts.wake === "string") {
patch.wakeMode = opts.wake;
@@ -169,10 +212,7 @@ export function registerCronEditCommand(cron: Command) {
if (scheduleRequest.kind === "direct") {
patch.schedule = scheduleRequest.schedule;
} else if (scheduleRequest.kind === "patch-existing-cron") {
const listed = (await callGatewayFromCli("cron.list", opts, {
includeDisabled: true,
})) as { jobs?: CronJob[] } | null;
const existing = (listed?.jobs ?? []).find((job) => job.id === id);
const existing = await loadCronJobForEditSchedulePatch(opts, String(id));
if (!existing) {
throw new Error(`unknown cron job id: ${id}`);
}
@@ -188,7 +228,10 @@ export function registerCronEditCommand(cron: Command) {
: undefined;
const hasTimeoutSeconds = Boolean(timeoutSeconds && Number.isFinite(timeoutSeconds));
const hasDeliveryModeFlag = opts.announce || typeof opts.deliver === "boolean";
const hasDeliveryTarget = typeof opts.channel === "string" || typeof opts.to === "string";
const threadId = parseCronThreadIdOption(opts.threadId);
const hasDeliveryThreadId = typeof threadId === "number";
const hasDeliveryTarget =
typeof opts.channel === "string" || typeof opts.to === "string" || hasDeliveryThreadId;
const hasDeliveryAccount = typeof opts.account === "string";
const hasBestEffort = typeof opts.bestEffortDeliver === "boolean";
const hasAgentTurnPatch =
@@ -248,6 +291,9 @@ export function registerCronEditCommand(cron: Command) {
const to = opts.to.trim();
delivery.to = to ? to : undefined;
}
if (hasDeliveryThreadId) {
delivery.threadId = threadId;
}
if (typeof opts.account === "string") {
const account = opts.account.trim();
delivery.accountId = account ? account : undefined;

View File

@@ -0,0 +1,35 @@
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
} from "../../shared/string-coerce.js";
export function parseCronThreadIdOption(value: unknown): number | undefined {
const raw = normalizeOptionalString(value);
if (!raw) {
return undefined;
}
if (!/^\d+$/.test(raw)) {
throw new Error("--thread-id must be a positive integer Telegram topic thread id");
}
const parsed = Number.parseInt(raw, 10);
if (!Number.isSafeInteger(parsed) || parsed <= 0) {
throw new Error("--thread-id must be a safe positive integer Telegram topic thread id");
}
return parsed;
}
export function normalizeCronSessionTargetOption(value: unknown): string | undefined {
const raw = normalizeOptionalString(value);
if (!raw) {
return undefined;
}
const lower = normalizeLowercaseStringOrEmpty(raw);
if (lower === "main" || lower === "isolated" || lower === "current") {
return lower;
}
if (lower.startsWith("session:")) {
const id = normalizeOptionalString(raw.slice(8));
return id ? `session:${id}` : undefined;
}
return undefined;
}