diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d1d2c50b5d..7ba94a0c3a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Cron/Failure alerts: add configurable repeated-failure alerting with per-job overrides and Web UI cron editor support (`inherit|disabled|custom` with threshold/cooldown/channel/target fields). (#24789) Thanks xbrak. - Cron/Isolated model defaults: resolve isolated cron `subagents.model` (including object-form `primary`) through allowlist-aware model selection so isolated cron runs honor subagent model defaults unless explicitly overridden by job payload model. (#11474) Thanks @AnonO6. - Cron/Isolated sessions list: persist the intended pre-run model/provider on isolated cron session entries so `sessions_list` reflects payload/session model overrides even when runs fail before post-run telemetry persistence. (#21279) Thanks @altaywtf. - Cron/One-shot reliability: retry transient one-shot failures with bounded backoff and configurable retry policy before disabling. (#24435) Thanks . diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 415a58b08f2..1483a8ec660 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -551,4 +551,53 @@ describe("cron cli", () => { it("rejects --exact on edit when existing job is not cron", async () => { await expectCronEditWithScheduleLookupExit({ kind: "every", everyMs: 60_000 }, ["--exact"]); }); + + it("patches failure alert settings on cron edit", async () => { + callGatewayFromCli.mockClear(); + + const program = buildProgram(); + + await program.parseAsync( + [ + "cron", + "edit", + "job-1", + "--failure-alert-after", + "3", + "--failure-alert-cooldown", + "1h", + "--failure-alert-channel", + "telegram", + "--failure-alert-to", + "19098680", + ], + { from: "user" }, + ); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { + failureAlert?: { after?: number; cooldownMs?: number; channel?: string; to?: string }; + }; + }; + + expect(patch?.patch?.failureAlert?.after).toBe(3); + expect(patch?.patch?.failureAlert?.cooldownMs).toBe(3_600_000); + expect(patch?.patch?.failureAlert?.channel).toBe("telegram"); + expect(patch?.patch?.failureAlert?.to).toBe("19098680"); + }); + + it("supports --no-failure-alert on cron edit", async () => { + callGatewayFromCli.mockClear(); + + const program = buildProgram(); + + await program.parseAsync(["cron", "edit", "job-1", "--no-failure-alert"], { + from: "user", + }); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { patch?: { failureAlert?: boolean } }; + expect(patch?.patch?.failureAlert).toBe(false); + }); }); diff --git a/src/cli/cron-cli/register.cron-edit.ts b/src/cli/cron-cli/register.cron-edit.ts index 7802d700e7d..ac473e53bf9 100644 --- a/src/cli/cron-cli/register.cron-edit.ts +++ b/src/cli/cron-cli/register.cron-edit.ts @@ -62,6 +62,15 @@ export function registerCronEditCommand(cron: Command) { .option("--account ", "Channel account id for delivery (multi-account setups)") .option("--best-effort-deliver", "Do not fail job if delivery fails") .option("--no-best-effort-deliver", "Fail job when delivery fails") + .option("--failure-alert", "Enable failure alerts for this job") + .option("--no-failure-alert", "Disable failure alerts for this job") + .option("--failure-alert-after ", "Alert after N consecutive job errors") + .option( + "--failure-alert-channel ", + `Failure alert channel (${getCronChannelOptions()})`, + ) + .option("--failure-alert-to ", "Failure alert destination") + .option("--failure-alert-cooldown ", "Minimum time between alerts (e.g. 1h, 30m)") .action(async (id, opts) => { try { if (opts.session === "main" && opts.message) { @@ -264,6 +273,49 @@ export function registerCronEditCommand(cron: Command) { patch.delivery = delivery; } + const hasFailureAlertAfter = typeof opts.failureAlertAfter === "string"; + const hasFailureAlertChannel = typeof opts.failureAlertChannel === "string"; + const hasFailureAlertTo = typeof opts.failureAlertTo === "string"; + const hasFailureAlertCooldown = typeof opts.failureAlertCooldown === "string"; + const hasFailureAlertFields = + hasFailureAlertAfter || + hasFailureAlertChannel || + hasFailureAlertTo || + hasFailureAlertCooldown; + const failureAlertFlag = + typeof opts.failureAlert === "boolean" ? opts.failureAlert : undefined; + if (failureAlertFlag === false && hasFailureAlertFields) { + throw new Error("Use --no-failure-alert alone (without failure-alert-* options)."); + } + if (failureAlertFlag === false) { + patch.failureAlert = false; + } else if (failureAlertFlag === true || hasFailureAlertFields) { + const failureAlert: Record = {}; + if (hasFailureAlertAfter) { + const after = Number.parseInt(String(opts.failureAlertAfter), 10); + if (!Number.isFinite(after) || after <= 0) { + throw new Error("Invalid --failure-alert-after (must be a positive integer)."); + } + failureAlert.after = after; + } + if (hasFailureAlertChannel) { + const channel = String(opts.failureAlertChannel).trim().toLowerCase(); + failureAlert.channel = channel ? channel : undefined; + } + if (hasFailureAlertTo) { + const to = String(opts.failureAlertTo).trim(); + failureAlert.to = to ? to : undefined; + } + if (hasFailureAlertCooldown) { + const cooldownMs = parseDurationMs(String(opts.failureAlertCooldown)); + if (!cooldownMs && cooldownMs !== 0) { + throw new Error("Invalid --failure-alert-cooldown."); + } + failureAlert.cooldownMs = cooldownMs; + } + patch.failureAlert = failureAlert; + } + const res = await callGatewayFromCli("cron.update", opts, { id, patch, diff --git a/src/config/types.cron.ts b/src/config/types.cron.ts index 6568f4ad72a..427b1044477 100644 --- a/src/config/types.cron.ts +++ b/src/config/types.cron.ts @@ -10,6 +10,12 @@ export type CronRetryConfig = { retryOn?: CronRetryOn[]; }; +export type CronFailureAlertConfig = { + enabled?: boolean; + after?: number; + cooldownMs?: number; +}; + export type CronConfig = { enabled?: boolean; store?: string; @@ -37,4 +43,5 @@ export type CronConfig = { maxBytes?: number | string; keepLines?: number; }; + failureAlert?: CronFailureAlertConfig; }; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index ca9362dbcbe..c4e3adfc36f 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -395,6 +395,14 @@ export const OpenClawSchema = z }) .strict() .optional(), + failureAlert: z + .object({ + enabled: z.boolean().optional(), + after: z.number().int().min(1).optional(), + cooldownMs: z.number().int().min(0).optional(), + }) + .strict() + .optional(), }) .strict() .superRefine((val, ctx) => { diff --git a/src/cron/service.failure-alert.test.ts b/src/cron/service.failure-alert.test.ts new file mode 100644 index 00000000000..49ddac71409 --- /dev/null +++ b/src/cron/service.failure-alert.test.ts @@ -0,0 +1,198 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-failure-alert-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService failure alerts", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("alerts after configured consecutive failures and honors cooldown", async () => { + const store = await makeStorePath(); + const sendCronFailureAlert = vi.fn(async () => undefined); + const runIsolatedAgentJob = vi.fn(async () => ({ + status: "error" as const, + error: "wrong model id", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + cronConfig: { + failureAlert: { + enabled: true, + after: 2, + cooldownMs: 60_000, + }, + }, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + sendCronFailureAlert, + }); + + await cron.start(); + const job = await cron.add({ + name: "daily report", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + delivery: { mode: "announce", channel: "telegram", to: "19098680" }, + }); + + await cron.run(job.id, "force"); + expect(sendCronFailureAlert).not.toHaveBeenCalled(); + + await cron.run(job.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + expect(sendCronFailureAlert).toHaveBeenLastCalledWith( + expect.objectContaining({ + job: expect.objectContaining({ id: job.id }), + channel: "telegram", + to: "19098680", + text: expect.stringContaining('Cron job "daily report" failed 2 times'), + }), + ); + + await cron.run(job.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + + vi.advanceTimersByTime(60_000); + await cron.run(job.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(2); + expect(sendCronFailureAlert).toHaveBeenLastCalledWith( + expect.objectContaining({ + text: expect.stringContaining('Cron job "daily report" failed 4 times'), + }), + ); + + cron.stop(); + await store.cleanup(); + }); + + it("supports per-job failure alert override when global alerts are disabled", async () => { + const store = await makeStorePath(); + const sendCronFailureAlert = vi.fn(async () => undefined); + const runIsolatedAgentJob = vi.fn(async () => ({ + status: "error" as const, + error: "timeout", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + cronConfig: { + failureAlert: { + enabled: false, + }, + }, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + sendCronFailureAlert, + }); + + await cron.start(); + const job = await cron.add({ + name: "job with override", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + failureAlert: { + after: 1, + channel: "telegram", + to: "12345", + cooldownMs: 1, + }, + }); + + await cron.run(job.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + expect(sendCronFailureAlert).toHaveBeenLastCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "12345", + }), + ); + + cron.stop(); + await store.cleanup(); + }); + + it("respects per-job failureAlert=false and suppresses alerts", async () => { + const store = await makeStorePath(); + const sendCronFailureAlert = vi.fn(async () => undefined); + const runIsolatedAgentJob = vi.fn(async () => ({ + status: "error" as const, + error: "auth error", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + cronConfig: { + failureAlert: { + enabled: true, + after: 1, + }, + }, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + sendCronFailureAlert, + }); + + await cron.start(); + const job = await cron.add({ + name: "disabled alert job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + failureAlert: false, + }); + + await cron.run(job.id, "force"); + await cron.run(job.id, "force"); + expect(sendCronFailureAlert).not.toHaveBeenCalled(); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index b98a28aed66..5ccca6c43d3 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -9,6 +9,7 @@ import { import type { CronDelivery, CronDeliveryPatch, + CronFailureAlert, CronJob, CronJobCreate, CronJobPatch, @@ -419,6 +420,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo wakeMode: input.wakeMode, payload: input.payload, delivery: input.delivery, + failureAlert: input.failureAlert, state: { ...input.state, }, @@ -483,6 +485,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) { if (patch.delivery) { job.delivery = mergeCronDelivery(job.delivery, patch.delivery); } + if ("failureAlert" in patch) { + job.failureAlert = mergeCronFailureAlert(job.failureAlert, patch.failureAlert); + } if (job.sessionTarget === "main" && job.delivery?.mode !== "webhook") { job.delivery = undefined; } @@ -648,6 +653,42 @@ function mergeCronDelivery( return next; } +function mergeCronFailureAlert( + existing: CronFailureAlert | false | undefined, + patch: CronFailureAlert | false | undefined, +): CronFailureAlert | false | undefined { + if (patch === false) { + return false; + } + if (patch === undefined) { + return existing; + } + const base = existing === false || existing === undefined ? {} : existing; + const next: CronFailureAlert = { ...base }; + + if ("after" in patch) { + const after = typeof patch.after === "number" && Number.isFinite(patch.after) ? patch.after : 0; + next.after = after > 0 ? Math.floor(after) : undefined; + } + if ("channel" in patch) { + const channel = typeof patch.channel === "string" ? patch.channel.trim() : ""; + next.channel = channel ? channel : undefined; + } + if ("to" in patch) { + const to = typeof patch.to === "string" ? patch.to.trim() : ""; + next.to = to ? to : undefined; + } + if ("cooldownMs" in patch) { + const cooldownMs = + typeof patch.cooldownMs === "number" && Number.isFinite(patch.cooldownMs) + ? patch.cooldownMs + : -1; + next.cooldownMs = cooldownMs >= 0 ? Math.floor(cooldownMs) : undefined; + } + + return next; +} + export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) { if (!job.state) { job.state = {}; diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 604fd842b68..05adbafb274 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -5,6 +5,7 @@ import type { CronJob, CronJobCreate, CronJobPatch, + CronMessageChannel, CronRunOutcome, CronRunStatus, CronRunTelemetry, @@ -90,6 +91,12 @@ export type CronServiceDeps = { } & CronRunOutcome & CronRunTelemetry >; + sendCronFailureAlert?: (params: { + job: CronJob; + text: string; + channel: CronMessageChannel; + to?: string; + }) => Promise; onEvent?: (evt: CronEvent) => void; }; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 68bcf52cdac..333caabbfb1 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -6,6 +6,7 @@ import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus, CronJob, + CronMessageChannel, CronRunOutcome, CronRunStatus, CronRunTelemetry, @@ -33,6 +34,8 @@ const MAX_TIMER_DELAY_MS = 60_000; * but always breaks an infinite re-trigger cycle. (See #17821) */ const MIN_REFIRE_GAP_MS = 2_000; +const DEFAULT_FAILURE_ALERT_AFTER = 2; +const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { @@ -149,6 +152,106 @@ function resolveDeliveryStatus(params: { job: CronJob; delivered?: boolean }): C return resolveCronDeliveryPlan(params.job).requested ? "unknown" : "not-requested"; } +function normalizeCronMessageChannel(input: unknown): CronMessageChannel | undefined { + if (typeof input !== "string") { + return undefined; + } + const channel = input.trim().toLowerCase(); + return channel ? (channel as CronMessageChannel) : undefined; +} + +function normalizeTo(input: unknown): string | undefined { + if (typeof input !== "string") { + return undefined; + } + const to = input.trim(); + return to ? to : undefined; +} + +function clampPositiveInt(value: unknown, fallback: number): number { + if (typeof value !== "number" || !Number.isFinite(value)) { + return fallback; + } + const floored = Math.floor(value); + return floored >= 1 ? floored : fallback; +} + +function clampNonNegativeInt(value: unknown, fallback: number): number { + if (typeof value !== "number" || !Number.isFinite(value)) { + return fallback; + } + const floored = Math.floor(value); + return floored >= 0 ? floored : fallback; +} + +function resolveFailureAlert( + state: CronServiceState, + job: CronJob, +): { after: number; cooldownMs: number; channel: CronMessageChannel; to?: string } | null { + const globalConfig = state.deps.cronConfig?.failureAlert; + const jobConfig = job.failureAlert === false ? undefined : job.failureAlert; + + if (job.failureAlert === false) { + return null; + } + if (!jobConfig && globalConfig?.enabled !== true) { + return null; + } + + return { + after: clampPositiveInt(jobConfig?.after ?? globalConfig?.after, DEFAULT_FAILURE_ALERT_AFTER), + cooldownMs: clampNonNegativeInt( + jobConfig?.cooldownMs ?? globalConfig?.cooldownMs, + DEFAULT_FAILURE_ALERT_COOLDOWN_MS, + ), + channel: + normalizeCronMessageChannel(jobConfig?.channel) ?? + normalizeCronMessageChannel(job.delivery?.channel) ?? + "last", + to: normalizeTo(jobConfig?.to) ?? normalizeTo(job.delivery?.to), + }; +} + +function emitFailureAlert( + state: CronServiceState, + params: { + job: CronJob; + error?: string; + consecutiveErrors: number; + channel: CronMessageChannel; + to?: string; + }, +) { + const safeJobName = params.job.name || params.job.id; + const truncatedError = (params.error?.trim() || "unknown error").slice(0, 200); + const text = [ + `Cron job "${safeJobName}" failed ${params.consecutiveErrors} times`, + `Last error: ${truncatedError}`, + ].join("\n"); + + if (state.deps.sendCronFailureAlert) { + void state.deps + .sendCronFailureAlert({ + job: params.job, + text, + channel: params.channel, + to: params.to, + }) + .catch((err) => { + state.deps.log.warn( + { jobId: params.job.id, err: String(err) }, + "cron: failure alert delivery failed", + ); + }); + return; + } + + state.deps.enqueueSystemEvent(text, { agentId: params.job.agentId }); + if (params.job.wakeMode === "now") { + state.deps.requestHeartbeatNow({ reason: `cron:${params.job.id}:failure-alert` }); + } +} + /** * Apply the result of a job execution to the job's state. * Handles consecutive error tracking, exponential backoff, one-shot disable, @@ -181,8 +284,26 @@ export function applyJobResult( // Track consecutive errors for backoff / auto-disable. if (result.status === "error") { job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1; + const alertConfig = resolveFailureAlert(state, job); + if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) { + const now = state.deps.nowMs(); + const lastAlert = job.state.lastFailureAlertAtMs; + const inCooldown = + typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs); + if (!inCooldown) { + emitFailureAlert(state, { + job, + error: result.error, + consecutiveErrors: job.state.consecutiveErrors, + channel: alertConfig.channel, + to: alertConfig.to, + }); + job.state.lastFailureAlertAtMs = now; + } + } } else { job.state.consecutiveErrors = 0; + job.state.lastFailureAlertAtMs = undefined; } const shouldDelete = diff --git a/src/cron/types.ts b/src/cron/types.ts index fc3a89ec6a3..401e07e6f5b 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -56,6 +56,13 @@ export type CronRunOutcome = { sessionKey?: string; }; +export type CronFailureAlert = { + after?: number; + channel?: CronMessageChannel; + to?: string; + cooldownMs?: number; +}; + export type CronPayload = | { kind: "systemEvent"; text: string } | { @@ -102,6 +109,8 @@ export type CronJobState = { lastDurationMs?: number; /** Number of consecutive execution errors (reset on success). Used for backoff. */ consecutiveErrors?: number; + /** Last failure alert timestamp (ms since epoch) for cooldown gating. */ + lastFailureAlertAtMs?: number; /** Number of consecutive schedule computation errors. Auto-disables job after threshold. */ scheduleErrorCount?: number; /** Explicit delivery outcome, separate from execution outcome. */ @@ -128,6 +137,7 @@ export type CronJob = { wakeMode: CronWakeMode; payload: CronPayload; delivery?: CronDelivery; + failureAlert?: CronFailureAlert | false; state: CronJobState; }; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 77238464bd8..8a47e1ff36d 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -187,6 +187,16 @@ export const CronDeliveryPatchSchema = Type.Object( { additionalProperties: false }, ); +export const CronFailureAlertSchema = Type.Object( + { + after: Type.Optional(Type.Integer({ minimum: 1 })), + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + to: Type.Optional(Type.String()), + cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })), + }, + { additionalProperties: false }, +); + export const CronJobStateSchema = Type.Object( { nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), @@ -200,6 +210,7 @@ export const CronJobStateSchema = Type.Object( lastDelivered: Type.Optional(Type.Boolean()), lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema), lastDeliveryError: Type.Optional(Type.String()), + lastFailureAlertAtMs: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, ); @@ -220,6 +231,7 @@ export const CronJobSchema = Type.Object( wakeMode: CronWakeModeSchema, payload: CronPayloadSchema, delivery: Type.Optional(CronDeliverySchema), + failureAlert: Type.Optional(Type.Union([Type.Literal(false), CronFailureAlertSchema])), state: CronJobStateSchema, }, { additionalProperties: false }, @@ -249,6 +261,7 @@ export const CronAddParamsSchema = Type.Object( wakeMode: CronWakeModeSchema, payload: CronPayloadSchema, delivery: Type.Optional(CronDeliverySchema), + failureAlert: Type.Optional(Type.Union([Type.Literal(false), CronFailureAlertSchema])), }, { additionalProperties: false }, ); @@ -262,6 +275,7 @@ export const CronJobPatchSchema = Type.Object( wakeMode: Type.Optional(CronWakeModeSchema), payload: Type.Optional(CronPayloadPatchSchema), delivery: Type.Optional(CronDeliveryPatchSchema), + failureAlert: Type.Optional(Type.Union([Type.Literal(false), CronFailureAlertSchema])), state: Type.Optional(Type.Partial(CronJobStateSchema)), }, { additionalProperties: false }, diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index dbfb8350b87..72cf2a2794a 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -1,5 +1,6 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import type { CliDeps } from "../cli/deps.js"; +import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import { loadConfig } from "../config/config.js"; import { canonicalizeMainSessionAlias, @@ -8,6 +9,7 @@ import { } from "../config/sessions.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; +import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js"; import { appendCronRunLog, resolveCronRunLogPath, @@ -21,6 +23,7 @@ import { runHeartbeatOnce } from "../infra/heartbeat-runner.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js"; import { SsrFBlockedError } from "../infra/net/ssrf.js"; +import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js"; @@ -223,6 +226,25 @@ export function buildGatewayCronService(params: { lane: "cron", }); }, + sendCronFailureAlert: async ({ job, text, channel, to }) => { + const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); + const target = await resolveDeliveryTarget(runtimeConfig, agentId, { + channel, + to, + }); + if (!target.ok) { + throw target.error; + } + await deliverOutboundPayloads({ + cfg: runtimeConfig, + channel: target.channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + payloads: [{ text }], + deps: createOutboundSendDeps(params.deps), + }); + }, log: getChildLogger({ module: "cron", storePath }), onEvent: (evt) => { params.broadcast("cron", evt, { dropIfSlow: true }); diff --git a/ui/src/ui/app-defaults.ts b/ui/src/ui/app-defaults.ts index ba8edc45106..b3661b18e77 100644 --- a/ui/src/ui/app-defaults.ts +++ b/ui/src/ui/app-defaults.ts @@ -36,5 +36,10 @@ export const DEFAULT_CRON_FORM: CronFormState = { deliveryChannel: "last", deliveryTo: "", deliveryBestEffort: false, + failureAlertMode: "inherit", + failureAlertAfter: "2", + failureAlertCooldownSeconds: "3600", + failureAlertChannel: "last", + failureAlertTo: "", timeoutSeconds: "", }; diff --git a/ui/src/ui/controllers/cron.test.ts b/ui/src/ui/controllers/cron.test.ts index f94e4dd1f93..576ba873b8f 100644 --- a/ui/src/ui/controllers/cron.test.ts +++ b/ui/src/ui/controllers/cron.test.ts @@ -298,6 +298,87 @@ describe("cron controller", () => { }); }); + it("includes custom failureAlert fields in cron.update patch", async () => { + const request = vi.fn(async (method: string, _payload?: unknown) => { + if (method === "cron.update") { + return { id: "job-alert" }; + } + if (method === "cron.list") { + return { jobs: [{ id: "job-alert" }] }; + } + if (method === "cron.status") { + return { enabled: true, jobs: 1, nextWakeAtMs: null }; + } + return {}; + }); + const state = createState({ + client: { request } as unknown as CronState["client"], + cronEditingJobId: "job-alert", + cronForm: { + ...DEFAULT_CRON_FORM, + name: "alert job", + payloadKind: "agentTurn", + payloadText: "run it", + failureAlertMode: "custom", + failureAlertAfter: "3", + failureAlertCooldownSeconds: "120", + failureAlertChannel: "telegram", + failureAlertTo: "123456", + }, + }); + + await addCronJob(state); + + const updateCall = request.mock.calls.find(([method]) => method === "cron.update"); + expect(updateCall).toBeDefined(); + expect(updateCall?.[1]).toMatchObject({ + id: "job-alert", + patch: { + failureAlert: { + after: 3, + cooldownMs: 120_000, + channel: "telegram", + to: "123456", + }, + }, + }); + }); + + it("includes failureAlert=false when disabled per job", async () => { + const request = vi.fn(async (method: string, _payload?: unknown) => { + if (method === "cron.update") { + return { id: "job-no-alert" }; + } + if (method === "cron.list") { + return { jobs: [{ id: "job-no-alert" }] }; + } + if (method === "cron.status") { + return { enabled: true, jobs: 1, nextWakeAtMs: null }; + } + return {}; + }); + const state = createState({ + client: { request } as unknown as CronState["client"], + cronEditingJobId: "job-no-alert", + cronForm: { + ...DEFAULT_CRON_FORM, + name: "alert off", + payloadKind: "agentTurn", + payloadText: "run it", + failureAlertMode: "disabled", + }, + }); + + await addCronJob(state); + + const updateCall = request.mock.calls.find(([method]) => method === "cron.update"); + expect(updateCall).toBeDefined(); + expect(updateCall?.[1]).toMatchObject({ + id: "job-no-alert", + patch: { failureAlert: false }, + }); + }); + it("maps cron stagger, model, thinking, and best effort into form", () => { const state = createState(); const job = { @@ -331,6 +412,36 @@ describe("cron controller", () => { expect(state.cronForm.deliveryBestEffort).toBe(true); }); + it("maps failureAlert overrides into form fields", () => { + const state = createState(); + const job = { + id: "job-11", + name: "Failure alerts", + enabled: true, + createdAtMs: 0, + updatedAtMs: 0, + schedule: { kind: "every" as const, everyMs: 60_000 }, + sessionTarget: "isolated" as const, + wakeMode: "next-heartbeat" as const, + payload: { kind: "agentTurn" as const, message: "hello" }, + failureAlert: { + after: 4, + cooldownMs: 30_000, + channel: "telegram", + to: "999", + }, + state: {}, + }; + + startCronEdit(state, job); + + expect(state.cronForm.failureAlertMode).toBe("custom"); + expect(state.cronForm.failureAlertAfter).toBe("4"); + expect(state.cronForm.failureAlertCooldownSeconds).toBe("30"); + expect(state.cronForm.failureAlertChannel).toBe("telegram"); + expect(state.cronForm.failureAlertTo).toBe("999"); + }); + it("validates key cron form errors", () => { const errors = validateCronForm({ ...DEFAULT_CRON_FORM, diff --git a/ui/src/ui/controllers/cron.ts b/ui/src/ui/controllers/cron.ts index 286c84f38d4..1de4f0ec9f2 100644 --- a/ui/src/ui/controllers/cron.ts +++ b/ui/src/ui/controllers/cron.ts @@ -29,7 +29,9 @@ export type CronFieldKey = | "payloadModel" | "payloadThinking" | "timeoutSeconds" - | "deliveryTo"; + | "deliveryTo" + | "failureAlertAfter" + | "failureAlertCooldownSeconds"; export type CronFieldErrors = Partial>; @@ -145,6 +147,22 @@ export function validateCronForm(form: CronFormState): CronFieldErrors { errors.deliveryTo = "cron.errors.webhookUrlInvalid"; } } + if (form.failureAlertMode === "custom") { + const afterRaw = form.failureAlertAfter.trim(); + if (afterRaw) { + const after = toNumber(afterRaw, 0); + if (!Number.isFinite(after) || after <= 0) { + errors.failureAlertAfter = "Failure alert threshold must be greater than 0."; + } + } + const cooldownRaw = form.failureAlertCooldownSeconds.trim(); + if (cooldownRaw) { + const cooldown = toNumber(cooldownRaw, -1); + if (!Number.isFinite(cooldown) || cooldown < 0) { + errors.failureAlertCooldownSeconds = "Cooldown must be 0 or greater."; + } + } + } return errors; } @@ -374,6 +392,7 @@ function parseStaggerSchedule( } function jobToForm(job: CronJob, prev: CronFormState): CronFormState { + const failureAlert = job.failureAlert; const next: CronFormState = { ...prev, name: job.name, @@ -401,6 +420,27 @@ function jobToForm(job: CronJob, prev: CronFormState): CronFormState { deliveryChannel: job.delivery?.channel ?? CRON_CHANNEL_LAST, deliveryTo: job.delivery?.to ?? "", deliveryBestEffort: job.delivery?.bestEffort ?? false, + failureAlertMode: + failureAlert === false + ? "disabled" + : failureAlert && typeof failureAlert === "object" + ? "custom" + : "inherit", + failureAlertAfter: + failureAlert && typeof failureAlert === "object" && typeof failureAlert.after === "number" + ? String(failureAlert.after) + : DEFAULT_CRON_FORM.failureAlertAfter, + failureAlertCooldownSeconds: + failureAlert && + typeof failureAlert === "object" && + typeof failureAlert.cooldownMs === "number" + ? String(Math.floor(failureAlert.cooldownMs / 1000)) + : DEFAULT_CRON_FORM.failureAlertCooldownSeconds, + failureAlertChannel: + failureAlert && typeof failureAlert === "object" + ? (failureAlert.channel ?? CRON_CHANNEL_LAST) + : CRON_CHANNEL_LAST, + failureAlertTo: failureAlert && typeof failureAlert === "object" ? (failureAlert.to ?? "") : "", timeoutSeconds: job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number" ? String(job.payload.timeoutSeconds) @@ -495,6 +535,26 @@ export function buildCronPayload(form: CronFormState) { return payload; } +function buildFailureAlert(form: CronFormState) { + if (form.failureAlertMode === "disabled") { + return false as const; + } + if (form.failureAlertMode !== "custom") { + return undefined; + } + const after = toNumber(form.failureAlertAfter.trim(), 0); + const cooldownSeconds = toNumber(form.failureAlertCooldownSeconds.trim(), 0); + return { + after: after > 0 ? Math.floor(after) : undefined, + channel: form.failureAlertChannel.trim() || CRON_CHANNEL_LAST, + to: form.failureAlertTo.trim() || undefined, + cooldownMs: + Number.isFinite(cooldownSeconds) && cooldownSeconds >= 0 + ? Math.floor(cooldownSeconds * 1000) + : undefined, + }; +} + export async function addCronJob(state: CronState) { if (!state.client || !state.connected || state.cronBusy) { return; @@ -527,6 +587,7 @@ export async function addCronJob(state: CronState) { bestEffort: form.deliveryBestEffort, } : undefined; + const failureAlert = buildFailureAlert(form); const agentId = form.clearAgent ? null : form.agentId.trim(); const job = { name: form.name.trim(), @@ -539,6 +600,7 @@ export async function addCronJob(state: CronState) { wakeMode: form.wakeMode, payload, delivery, + failureAlert, }; if (!job.name) { throw new Error(t("cron.errors.nameRequiredShort")); diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index 3c4091479b4..23b34bde627 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -491,6 +491,13 @@ export type CronDelivery = { bestEffort?: boolean; }; +export type CronFailureAlert = { + after?: number; + channel?: string; + to?: string; + cooldownMs?: number; +}; + export type CronJobState = { nextRunAtMs?: number; runningAtMs?: number; @@ -498,6 +505,7 @@ export type CronJobState = { lastStatus?: "ok" | "error" | "skipped"; lastError?: string; lastDurationMs?: number; + lastFailureAlertAtMs?: number; }; export type CronJob = { @@ -514,6 +522,7 @@ export type CronJob = { wakeMode: CronWakeMode; payload: CronPayload; delivery?: CronDelivery; + failureAlert?: CronFailureAlert | false; state?: CronJobState; }; diff --git a/ui/src/ui/ui-types.ts b/ui/src/ui/ui-types.ts index f1087546c79..c179bdea1cb 100644 --- a/ui/src/ui/ui-types.ts +++ b/ui/src/ui/ui-types.ts @@ -40,5 +40,10 @@ export type CronFormState = { deliveryChannel: string; deliveryTo: string; deliveryBestEffort: boolean; + failureAlertMode: "inherit" | "disabled" | "custom"; + failureAlertAfter: string; + failureAlertCooldownSeconds: string; + failureAlertChannel: string; + failureAlertTo: string; timeoutSeconds: string; }; diff --git a/ui/src/ui/views/cron.ts b/ui/src/ui/views/cron.ts index fbcc942bf42..a9606cd6fbd 100644 --- a/ui/src/ui/views/cron.ts +++ b/ui/src/ui/views/cron.ts @@ -239,6 +239,12 @@ function inputIdForField(key: CronFieldKey) { if (key === "timeoutSeconds") { return "cron-timeout-seconds"; } + if (key === "failureAlertAfter") { + return "cron-failure-alert-after"; + } + if (key === "failureAlertCooldownSeconds") { + return "cron-failure-alert-cooldown-seconds"; + } return "cron-delivery-to"; } @@ -266,6 +272,8 @@ function fieldLabelForKey( payloadThinking: t("cron.form.thinking"), timeoutSeconds: t("cron.form.timeoutSeconds"), deliveryTo: t("cron.form.to"), + failureAlertAfter: "Failure alert after", + failureAlertCooldownSeconds: "Failure alert cooldown", }; return labels[key]; } @@ -286,6 +294,8 @@ function collectBlockingFields( "payloadThinking", "timeoutSeconds", "deliveryTo", + "failureAlertAfter", + "failureAlertCooldownSeconds", ]; const fields: BlockingField[] = []; for (const key of orderedKeys) { @@ -1057,6 +1067,115 @@ export function renderCron(props: CronProps) { ` : nothing } + ${ + isAgentTurn + ? html` + + ${ + props.form.failureAlertMode === "custom" + ? html` + + + + + ` + : nothing + } + ` + : nothing + } ${ selectedDeliveryMode !== "none" ? html`