diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bd2d9ea67c..8bef2939633 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -202,6 +202,7 @@ Docs: https://docs.openclaw.ai - Discord/Application ID fallback: parse bot application IDs from token prefixes without numeric precision loss and use token fallback only on transport/timeout failures when probing `/oauth2/applications/@me`. Landed from contributor PR #29695 by @dhananjai1729. Thanks @dhananjai1729. - Discord/EventQueue timeout config: expose per-account `channels.discord.accounts..eventQueue.listenerTimeout` (and related queue options) so long-running handlers can avoid Carbon listener timeout drops. Landed from contributor PR #28945 by @Glucksberg. Thanks @Glucksberg. - CLI/Cron run exit code: return exit code `0` only when `cron run` reports `{ ok: true, ran: true }`, and `1` for non-run/error outcomes so scripting/debugging reflects actual execution status. Landed from contributor PR #31121 by @Sid-Qin. Thanks @Sid-Qin. +- Cron/Failure delivery routing: add `failureAlert.mode` (`announce|webhook`) and `failureAlert.accountId` support, plus `cron.failureDestination` and per-job `delivery.failureDestination` routing with duplicate-target suppression, best-effort skip behavior, and global+job merge semantics. Landed from contributor PR #31059 by @kesor. Thanks @kesor. - CLI/JSON preflight output: keep `--json` command stdout machine-readable by suppressing doctor preflight note output while still running legacy migration/config doctor flow. (#24368) Thanks @altaywtf. - Nodes/Screen recording guardrails: cap `nodes` tool `screen_record` `durationMs` to 5 minutes at both schema-validation and runtime invocation layers to prevent long-running blocking captures from unbounded durations. Landed from contributor PR #31106 by @BlueBirdBack. Thanks @BlueBirdBack. - Telegram/Empty final replies: skip outbound send for null/undefined final text payloads without media so Telegram typing indicators do not linger on `text must be non-empty` errors, with added regression coverage for undefined final payload dispatch. Landed from contributor PRs #30969 by @haosenwang1018 and #30746 by @rylena. Thanks @haosenwang1018 and @rylena. diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 4ebb6736106..562a239385d 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -679,19 +679,39 @@ describe("cron cli", () => { expect(patch?.patch?.failureAlert).toBe(false); }); - it("uses a longer default timeout for cron run", async () => { - const { runOpts } = await runCronRunAndCaptureExit({ - ran: true, - args: ["cron", "run", "job-1", "--expect-final"], - }); - expect(runOpts.timeout).toBe("600000"); - }); + it("patches failure alert mode/accountId on cron edit", async () => { + callGatewayFromCli.mockClear(); - it("preserves explicit --timeout for cron run", async () => { - const { runOpts } = await runCronRunAndCaptureExit({ - ran: true, - args: ["cron", "run", "job-1", "--expect-final", "--timeout", "45000"], - }); - expect(runOpts.timeout).toBe("45000"); + const program = buildProgram(); + + await program.parseAsync( + [ + "cron", + "edit", + "job-1", + "--failure-alert-after", + "1", + "--failure-alert-mode", + "webhook", + "--failure-alert-account-id", + "bot-a", + ], + { from: "user" }, + ); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { + failureAlert?: { + after?: number; + mode?: "announce" | "webhook"; + accountId?: string; + }; + }; + }; + + expect(patch?.patch?.failureAlert?.after).toBe(1); + expect(patch?.patch?.failureAlert?.mode).toBe("webhook"); + expect(patch?.patch?.failureAlert?.accountId).toBe("bot-a"); }); }); diff --git a/src/cli/cron-cli/register.cron-edit.ts b/src/cli/cron-cli/register.cron-edit.ts index a7c21f8750b..9670c65cb29 100644 --- a/src/cli/cron-cli/register.cron-edit.ts +++ b/src/cli/cron-cli/register.cron-edit.ts @@ -73,6 +73,11 @@ export function registerCronEditCommand(cron: Command) { ) .option("--failure-alert-to ", "Failure alert destination") .option("--failure-alert-cooldown ", "Minimum time between alerts (e.g. 1h, 30m)") + .option("--failure-alert-mode ", "Failure alert delivery mode (announce or webhook)") + .option( + "--failure-alert-account-id ", + "Account ID for failure alert channel (multi-account setups)", + ) .action(async (id, opts) => { try { if (opts.session === "main" && opts.message) { @@ -286,11 +291,15 @@ export function registerCronEditCommand(cron: Command) { const hasFailureAlertChannel = typeof opts.failureAlertChannel === "string"; const hasFailureAlertTo = typeof opts.failureAlertTo === "string"; const hasFailureAlertCooldown = typeof opts.failureAlertCooldown === "string"; + const hasFailureAlertMode = typeof opts.failureAlertMode === "string"; + const hasFailureAlertAccountId = typeof opts.failureAlertAccountId === "string"; const hasFailureAlertFields = hasFailureAlertAfter || hasFailureAlertChannel || hasFailureAlertTo || - hasFailureAlertCooldown; + hasFailureAlertCooldown || + hasFailureAlertMode || + hasFailureAlertAccountId; const failureAlertFlag = typeof opts.failureAlert === "boolean" ? opts.failureAlert : undefined; if (failureAlertFlag === false && hasFailureAlertFields) { @@ -322,6 +331,17 @@ export function registerCronEditCommand(cron: Command) { } failureAlert.cooldownMs = cooldownMs; } + if (hasFailureAlertMode) { + const mode = String(opts.failureAlertMode).trim().toLowerCase(); + if (mode !== "announce" && mode !== "webhook") { + throw new Error("Invalid --failure-alert-mode (must be 'announce' or 'webhook')."); + } + failureAlert.mode = mode; + } + if (hasFailureAlertAccountId) { + const accountId = String(opts.failureAlertAccountId).trim(); + failureAlert.accountId = accountId ? accountId : undefined; + } patch.failureAlert = failureAlert; } diff --git a/src/config/types.cron.ts b/src/config/types.cron.ts index 427b1044477..2e44ec9c92e 100644 --- a/src/config/types.cron.ts +++ b/src/config/types.cron.ts @@ -14,6 +14,15 @@ export type CronFailureAlertConfig = { enabled?: boolean; after?: number; cooldownMs?: number; + mode?: "announce" | "webhook"; + accountId?: string; +}; + +export type CronFailureDestinationConfig = { + channel?: string; + to?: string; + accountId?: string; + mode?: "announce" | "webhook"; }; export type CronConfig = { @@ -44,4 +53,6 @@ export type CronConfig = { keepLines?: number; }; failureAlert?: CronFailureAlertConfig; + /** Default destination for failure notifications across all cron jobs. */ + failureDestination?: CronFailureDestinationConfig; }; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 2d57fb49a06..5b2cf7d075c 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -417,6 +417,17 @@ export const OpenClawSchema = z enabled: z.boolean().optional(), after: z.number().int().min(1).optional(), cooldownMs: z.number().int().min(0).optional(), + mode: z.enum(["announce", "webhook"]).optional(), + accountId: z.string().optional(), + }) + .strict() + .optional(), + failureDestination: z + .object({ + channel: z.string().optional(), + to: z.string().optional(), + accountId: z.string().optional(), + mode: z.enum(["announce", "webhook"]).optional(), }) .strict() .optional(), diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 7cc690f79cf..81ab672af57 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { resolveCronDeliveryPlan } from "./delivery.js"; +import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js"; import type { CronJob } from "./types.js"; function makeJob(overrides: Partial): CronJob { @@ -85,3 +85,96 @@ describe("resolveCronDeliveryPlan", () => { expect(plan.accountId).toBe("bot-a"); }); }); + +describe("resolveFailureDestination", () => { + it("merges global defaults with job-level overrides", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { channel: "signal", mode: "announce" }, + }, + }), + { + channel: "telegram", + to: "222", + mode: "announce", + accountId: "global-account", + }, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "signal", + to: "222", + accountId: "global-account", + }); + }); + + it("returns null for webhook mode without destination URL", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { mode: "webhook" }, + }, + }), + undefined, + ); + expect(plan).toBeNull(); + }); + + it("returns null when failure destination matches primary delivery target", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "telegram", + to: "111", + accountId: "bot-a", + }, + }, + }), + undefined, + ); + expect(plan).toBeNull(); + }); + + it("allows job-level failure destination fields to clear inherited global values", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { + mode: "announce", + channel: undefined as never, + to: undefined as never, + accountId: undefined as never, + }, + }, + }), + { + channel: "signal", + to: "group-abc", + accountId: "global-account", + mode: "announce", + }, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "last", + to: undefined, + accountId: undefined, + }); + }); +}); diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index 53e3450ab72..9d502a74fcb 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -1,4 +1,14 @@ -import type { CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; +import type { CliDeps } from "../cli/deps.js"; +import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; +import type { CronFailureDestinationConfig } from "../config/types.cron.js"; +import type { OpenClawConfig } from "../config/types.js"; +import { formatErrorMessage } from "../infra/errors.js"; +import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; +import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js"; +import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; +import { getChildLogger } from "../logging.js"; +import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js"; +import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; export type CronDeliveryPlan = { mode: CronDeliveryMode; @@ -90,3 +100,202 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { requested, }; } + +export type CronFailureDeliveryPlan = { + mode: "announce" | "webhook"; + channel?: CronMessageChannel; + to?: string; + accountId?: string; +}; + +export type CronFailureDestinationInput = { + channel?: CronMessageChannel; + to?: string; + accountId?: string; + mode?: "announce" | "webhook"; +}; + +function normalizeFailureMode(value: unknown): "announce" | "webhook" | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim().toLowerCase(); + if (trimmed === "announce" || trimmed === "webhook") { + return trimmed; + } + return undefined; +} + +export function resolveFailureDestination( + job: CronJob, + globalConfig?: CronFailureDestinationConfig, +): CronFailureDeliveryPlan | null { + const delivery = job.delivery; + const jobFailureDest = delivery?.failureDestination as CronFailureDestinationInput | undefined; + const hasJobFailureDest = jobFailureDest && typeof jobFailureDest === "object"; + + let channel: CronMessageChannel | undefined; + let to: string | undefined; + let accountId: string | undefined; + let mode: "announce" | "webhook" | undefined; + + // Start with global config as base + if (globalConfig) { + channel = normalizeChannel(globalConfig.channel); + to = normalizeTo(globalConfig.to); + accountId = normalizeAccountId(globalConfig.accountId); + mode = normalizeFailureMode(globalConfig.mode); + } + + // Override with job-level values if present + if (hasJobFailureDest) { + const jobChannel = normalizeChannel(jobFailureDest.channel); + const jobTo = normalizeTo(jobFailureDest.to); + const jobAccountId = normalizeAccountId(jobFailureDest.accountId); + const jobMode = normalizeFailureMode(jobFailureDest.mode); + const hasJobChannelField = "channel" in jobFailureDest; + const hasJobToField = "to" in jobFailureDest; + const hasJobAccountIdField = "accountId" in jobFailureDest; + + // Track if 'to' was explicitly set at job level + const jobToExplicitValue = hasJobToField && jobTo !== undefined; + + // Respect explicit clears from partial patches. + if (hasJobChannelField) { + channel = jobChannel; + } + if (hasJobToField) { + to = jobTo; + } + if (hasJobAccountIdField) { + accountId = jobAccountId; + } + if (jobMode !== undefined) { + // Mode was explicitly overridden - clear inherited 'to' since URL semantics differ + // between announce (channel recipient) and webhook (HTTP endpoint) + // But preserve explicit 'to' that was set at job level + // Treat undefined global mode as "announce" for comparison + const globalMode = globalConfig?.mode ?? "announce"; + if (!jobToExplicitValue && globalMode !== jobMode) { + to = undefined; + } + mode = jobMode; + } + } + + if (!channel && !to && !accountId && !mode) { + return null; + } + + const resolvedMode = mode ?? "announce"; + + // Webhook mode requires a URL + if (resolvedMode === "webhook" && !to) { + return null; + } + + const result: CronFailureDeliveryPlan = { + mode: resolvedMode, + channel: resolvedMode === "announce" ? (channel ?? "last") : undefined, + to, + accountId, + }; + + if (delivery && isSameDeliveryTarget(delivery, result)) { + return null; + } + + return result; +} + +function isSameDeliveryTarget( + delivery: CronDelivery, + failurePlan: CronFailureDeliveryPlan, +): boolean { + const primaryMode = delivery.mode ?? "announce"; + if (primaryMode === "none") { + return false; + } + + const primaryChannel = delivery.channel; + const primaryTo = delivery.to; + const primaryAccountId = delivery.accountId; + + if (failurePlan.mode === "webhook") { + return primaryMode === "webhook" && primaryTo === failurePlan.to; + } + + const primaryChannelNormalized = primaryChannel ?? "last"; + const failureChannelNormalized = failurePlan.channel ?? "last"; + + return ( + failureChannelNormalized === primaryChannelNormalized && + failurePlan.to === primaryTo && + failurePlan.accountId === primaryAccountId + ); +} + +const FAILURE_NOTIFICATION_TIMEOUT_MS = 30_000; +const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" }); + +export async function sendFailureNotificationAnnounce( + deps: CliDeps, + cfg: OpenClawConfig, + agentId: string, + jobId: string, + target: { channel?: string; to?: string; accountId?: string }, + message: string, +): Promise { + const resolvedTarget = await resolveDeliveryTarget(cfg, agentId, { + channel: target.channel as CronMessageChannel | undefined, + to: target.to, + accountId: target.accountId, + }); + + if (!resolvedTarget.ok) { + cronDeliveryLogger.warn( + { error: resolvedTarget.error.message }, + "cron: failed to resolve failure destination target", + ); + return; + } + + const identity = resolveAgentOutboundIdentity(cfg, agentId); + const session = buildOutboundSessionContext({ + cfg, + agentId, + sessionKey: `cron:${jobId}:failure`, + }); + + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, FAILURE_NOTIFICATION_TIMEOUT_MS); + + try { + await deliverOutboundPayloads({ + cfg, + channel: resolvedTarget.channel, + to: resolvedTarget.to, + accountId: resolvedTarget.accountId, + threadId: resolvedTarget.threadId, + payloads: [{ text: message }], + session, + identity, + bestEffort: false, + deps: createOutboundSendDeps(deps), + abortSignal: abortController.signal, + }); + } catch (err) { + cronDeliveryLogger.warn( + { + err: formatErrorMessage(err), + channel: resolvedTarget.channel, + to: resolvedTarget.to, + }, + "cron: failure destination announce failed", + ); + } finally { + clearTimeout(timeout); + } +} diff --git a/src/cron/service.failure-alert.test.ts b/src/cron/service.failure-alert.test.ts index 49ddac71409..6cfa9780074 100644 --- a/src/cron/service.failure-alert.test.ts +++ b/src/cron/service.failure-alert.test.ts @@ -195,4 +195,72 @@ describe("CronService failure alerts", () => { cron.stop(); await store.cleanup(); }); + + it("threads failure alert mode/accountId and skips best-effort jobs", async () => { + const store = await makeStorePath(); + const sendCronFailureAlert = vi.fn(async () => undefined); + const runIsolatedAgentJob = vi.fn(async () => ({ + status: "error" as const, + error: "temporary upstream error", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + cronConfig: { + failureAlert: { + enabled: true, + after: 1, + mode: "webhook", + accountId: "global-account", + }, + }, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + sendCronFailureAlert, + }); + + await cron.start(); + const normalJob = await cron.add({ + name: "normal alert job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + delivery: { mode: "announce", channel: "telegram", to: "19098680" }, + }); + const bestEffortJob = await cron.add({ + name: "best effort alert job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + delivery: { + mode: "announce", + channel: "telegram", + to: "19098680", + bestEffort: true, + }, + }); + + await cron.run(normalJob.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + expect(sendCronFailureAlert).toHaveBeenCalledWith( + expect.objectContaining({ + mode: "webhook", + accountId: "global-account", + to: undefined, + }), + ); + + await cron.run(bestEffortJob.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + + cron.stop(); + await store.cleanup(); + }); }); diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index 4d0819ab906..9bd31726f91 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -222,6 +222,51 @@ describe("applyJobPatch", () => { expect(job.delivery).toEqual({ mode: "webhook", to: "https://example.invalid/trim" }); }); + it("rejects failureDestination on main jobs without webhook delivery mode", () => { + const job = createMainSystemEventJob("job-main-failure-dest", { + mode: "announce", + channel: "telegram", + to: "123", + failureDestination: { + mode: "announce", + channel: "telegram", + to: "999", + }, + }); + + expect(() => applyJobPatch(job, { enabled: true })).toThrow( + 'cron delivery.failureDestination is only supported for sessionTarget="isolated" unless delivery.mode="webhook"', + ); + }); + + it("validates and trims webhook failureDestination target URLs", () => { + const expectedError = + "cron failure destination webhook requires delivery.failureDestination.to to be a valid http(s) URL"; + const job = createIsolatedAgentTurnJob("job-failure-webhook-target", { + mode: "announce", + channel: "telegram", + to: "123", + failureDestination: { + mode: "webhook", + to: "not-a-url", + }, + }); + + expect(() => applyJobPatch(job, { enabled: true })).toThrow(expectedError); + + job.delivery = { + mode: "announce", + channel: "telegram", + to: "123", + failureDestination: { + mode: "webhook", + to: " https://example.invalid/failure ", + }, + }; + expect(() => applyJobPatch(job, { enabled: true })).not.toThrow(); + expect(job.delivery?.failureDestination?.to).toBe("https://example.invalid/failure"); + }); + it("rejects Telegram delivery with invalid target (chatId/topicId format)", () => { const job = createIsolatedAgentTurnJob("job-telegram-invalid", { mode: "announce", @@ -365,6 +410,25 @@ describe("createJob rejects sessionTarget main for non-default agents", () => { }), ).not.toThrow(); }); + + it("rejects failureDestination on main jobs without webhook delivery mode", () => { + const state = createMockState(now, { defaultAgentId: "main" }); + expect(() => + createJob(state, { + ...mainJobInput("main"), + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + failureDestination: { + mode: "announce", + channel: "signal", + to: "+15550001111", + }, + }, + }), + ).toThrow('cron channel delivery config is only supported for sessionTarget="isolated"'); + }); }); describe("applyJobPatch rejects sessionTarget main for non-default agents", () => { diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index ed5f57d78af..56c30c338c0 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -151,6 +151,27 @@ function assertDeliverySupport(job: Pick) } } +function assertFailureDestinationSupport(job: Pick) { + const failureDestination = job.delivery?.failureDestination; + if (!failureDestination) { + return; + } + if (job.sessionTarget === "main" && job.delivery?.mode !== "webhook") { + throw new Error( + 'cron delivery.failureDestination is only supported for sessionTarget="isolated" unless delivery.mode="webhook"', + ); + } + if (failureDestination.mode === "webhook") { + const target = normalizeHttpWebhookUrl(failureDestination.to); + if (!target) { + throw new Error( + "cron failure destination webhook requires delivery.failureDestination.to to be a valid http(s) URL", + ); + } + failureDestination.to = target; + } +} + export function findJobOrThrow(state: CronServiceState, id: string) { const job = state.store?.jobs.find((j) => j.id === id); if (!job) { @@ -452,6 +473,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo assertSupportedJobSpec(job); assertMainSessionAgentId(job, state.deps.defaultAgentId); assertDeliverySupport(job); + assertFailureDestinationSupport(job); job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); return job; } @@ -517,6 +539,15 @@ export function applyJobPatch( if ("failureAlert" in patch) { job.failureAlert = mergeCronFailureAlert(job.failureAlert, patch.failureAlert); } + if ( + job.sessionTarget === "main" && + job.delivery?.mode !== "webhook" && + job.delivery?.failureDestination + ) { + throw new Error( + 'cron delivery.failureDestination is only supported for sessionTarget="isolated" unless delivery.mode="webhook"', + ); + } if (job.sessionTarget === "main" && job.delivery?.mode !== "webhook") { job.delivery = undefined; } @@ -532,6 +563,7 @@ export function applyJobPatch( assertSupportedJobSpec(job); assertMainSessionAgentId(job, opts?.defaultAgentId); assertDeliverySupport(job); + assertFailureDestinationSupport(job); } function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronPayload { @@ -668,6 +700,7 @@ function mergeCronDelivery( to: existing?.to, accountId: existing?.accountId, bestEffort: existing?.bestEffort, + failureDestination: existing?.failureDestination, }; if (typeof patch.mode === "string") { @@ -685,6 +718,39 @@ function mergeCronDelivery( if (typeof patch.bestEffort === "boolean") { next.bestEffort = patch.bestEffort; } + if ("failureDestination" in patch) { + if (patch.failureDestination === undefined) { + next.failureDestination = undefined; + } else { + const existingFd = next.failureDestination; + const patchFd = patch.failureDestination; + const nextFd: typeof next.failureDestination = { + channel: existingFd?.channel, + to: existingFd?.to, + accountId: existingFd?.accountId, + mode: existingFd?.mode, + }; + if (patchFd) { + if ("channel" in patchFd) { + const channel = typeof patchFd.channel === "string" ? patchFd.channel.trim() : ""; + nextFd.channel = channel ? channel : undefined; + } + if ("to" in patchFd) { + const to = typeof patchFd.to === "string" ? patchFd.to.trim() : ""; + nextFd.to = to ? to : undefined; + } + if ("accountId" in patchFd) { + const accountId = typeof patchFd.accountId === "string" ? patchFd.accountId.trim() : ""; + nextFd.accountId = accountId ? accountId : undefined; + } + if ("mode" in patchFd) { + const mode = typeof patchFd.mode === "string" ? patchFd.mode.trim() : ""; + nextFd.mode = mode === "announce" || mode === "webhook" ? mode : undefined; + } + } + next.failureDestination = nextFd; + } + } return next; } @@ -719,6 +785,14 @@ function mergeCronFailureAlert( : -1; next.cooldownMs = cooldownMs >= 0 ? Math.floor(cooldownMs) : undefined; } + if ("mode" in patch) { + const mode = typeof patch.mode === "string" ? patch.mode.trim() : ""; + next.mode = mode === "announce" || mode === "webhook" ? mode : undefined; + } + if ("accountId" in patch) { + const accountId = typeof patch.accountId === "string" ? patch.accountId.trim() : ""; + next.accountId = accountId ? accountId : undefined; + } return next; } diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 05adbafb274..b65d0ebaa14 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -96,6 +96,8 @@ export type CronServiceDeps = { text: string; channel: CronMessageChannel; to?: string; + mode?: "announce" | "webhook"; + accountId?: string; }) => Promise; onEvent?: (evt: CronEvent) => void; }; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 9bf9d894e1e..3190ccbb45b 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -188,7 +188,14 @@ function clampNonNegativeInt(value: unknown, fallback: number): number { function resolveFailureAlert( state: CronServiceState, job: CronJob, -): { after: number; cooldownMs: number; channel: CronMessageChannel; to?: string } | null { +): { + after: number; + cooldownMs: number; + channel: CronMessageChannel; + to?: string; + mode?: "announce" | "webhook"; + accountId?: string; +} | null { const globalConfig = state.deps.cronConfig?.failureAlert; const jobConfig = job.failureAlert === false ? undefined : job.failureAlert; @@ -199,6 +206,9 @@ function resolveFailureAlert( return null; } + const mode = jobConfig?.mode ?? globalConfig?.mode; + const explicitTo = normalizeTo(jobConfig?.to); + return { after: clampPositiveInt(jobConfig?.after ?? globalConfig?.after, DEFAULT_FAILURE_ALERT_AFTER), cooldownMs: clampNonNegativeInt( @@ -209,7 +219,9 @@ function resolveFailureAlert( normalizeCronMessageChannel(jobConfig?.channel) ?? normalizeCronMessageChannel(job.delivery?.channel) ?? "last", - to: normalizeTo(jobConfig?.to) ?? normalizeTo(job.delivery?.to), + to: mode === "webhook" ? explicitTo : (explicitTo ?? normalizeTo(job.delivery?.to)), + mode, + accountId: jobConfig?.accountId ?? globalConfig?.accountId, }; } @@ -221,6 +233,8 @@ function emitFailureAlert( consecutiveErrors: number; channel: CronMessageChannel; to?: string; + mode?: "announce" | "webhook"; + accountId?: string; }, ) { const safeJobName = params.job.name || params.job.id; @@ -237,6 +251,8 @@ function emitFailureAlert( text, channel: params.channel, to: params.to, + mode: params.mode, + accountId: params.accountId, }) .catch((err) => { state.deps.log.warn( @@ -287,19 +303,26 @@ export function applyJobResult( job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1; const alertConfig = resolveFailureAlert(state, job); if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) { - const now = state.deps.nowMs(); - const lastAlert = job.state.lastFailureAlertAtMs; - const inCooldown = - typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs); - if (!inCooldown) { - emitFailureAlert(state, { - job, - error: result.error, - consecutiveErrors: job.state.consecutiveErrors, - channel: alertConfig.channel, - to: alertConfig.to, - }); - job.state.lastFailureAlertAtMs = now; + const isBestEffort = + job.delivery?.bestEffort === true || + (job.payload.kind === "agentTurn" && job.payload.bestEffortDeliver === true); + if (!isBestEffort) { + const now = state.deps.nowMs(); + const lastAlert = job.state.lastFailureAlertAtMs; + const inCooldown = + typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs); + if (!inCooldown) { + emitFailureAlert(state, { + job, + error: result.error, + consecutiveErrors: job.state.consecutiveErrors, + channel: alertConfig.channel, + to: alertConfig.to, + mode: alertConfig.mode, + accountId: alertConfig.accountId, + }); + job.state.lastFailureAlertAtMs = now; + } } } } else { diff --git a/src/cron/types.ts b/src/cron/types.ts index 3d089b40f98..1010f4b7682 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -25,6 +25,15 @@ export type CronDelivery = { /** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */ accountId?: string; bestEffort?: boolean; + /** Separate destination for failure notifications. */ + failureDestination?: CronFailureDestination; +}; + +export type CronFailureDestination = { + channel?: CronMessageChannel; + to?: string; + accountId?: string; + mode?: "announce" | "webhook"; }; export type CronDeliveryPatch = Partial; @@ -61,6 +70,10 @@ export type CronFailureAlert = { channel?: CronMessageChannel; to?: string; cooldownMs?: number; + /** Delivery mode: announce (via messaging channels) or webhook (HTTP POST). */ + mode?: "announce" | "webhook"; + /** Account ID for multi-account channel configurations. */ + accountId?: string; }; export type CronPayload = diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index b4ca4fee17e..41e7467bece 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -138,10 +138,33 @@ export const CronPayloadPatchSchema = Type.Union([ cronAgentTurnPayloadSchema({ message: Type.Optional(NonEmptyString) }), ]); +export const CronFailureAlertSchema = Type.Object( + { + after: Type.Optional(Type.Integer({ minimum: 1 })), + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + to: Type.Optional(Type.String()), + cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })), + mode: Type.Optional(Type.Union([Type.Literal("announce"), Type.Literal("webhook")])), + accountId: Type.Optional(NonEmptyString), + }, + { additionalProperties: false }, +); + +export const CronFailureDestinationSchema = Type.Object( + { + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + to: Type.Optional(Type.String()), + accountId: Type.Optional(NonEmptyString), + mode: Type.Optional(Type.Union([Type.Literal("announce"), Type.Literal("webhook")])), + }, + { additionalProperties: false }, +); + const CronDeliverySharedProperties = { channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), accountId: Type.Optional(NonEmptyString), bestEffort: Type.Optional(Type.Boolean()), + failureDestination: Type.Optional(CronFailureDestinationSchema), }; const CronDeliveryNoopSchema = Type.Object( @@ -188,16 +211,6 @@ export const CronDeliveryPatchSchema = Type.Object( { additionalProperties: false }, ); -export const CronFailureAlertSchema = Type.Object( - { - after: Type.Optional(Type.Integer({ minimum: 1 })), - channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), - to: Type.Optional(Type.String()), - cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })), - }, - { additionalProperties: false }, -); - export const CronJobStateSchema = Type.Object( { nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 72cf2a2794a..0704b7dc2c0 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -8,6 +8,7 @@ import { resolveAgentMainSessionKey, } from "../config/sessions.js"; import { resolveStorePath } from "../config/sessions/paths.js"; +import { resolveFailureDestination, sendFailureNotificationAnnounce } from "../cron/delivery.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js"; import { @@ -72,6 +73,66 @@ function resolveCronWebhookTarget(params: { return null; } +function buildCronWebhookHeaders(webhookToken?: string): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + if (webhookToken) { + headers.Authorization = `Bearer ${webhookToken}`; + } + return headers; +} + +async function postCronWebhook(params: { + webhookUrl: string; + webhookToken?: string; + payload: unknown; + logContext: Record; + blockedLog: string; + failedLog: string; + logger: ReturnType; +}): Promise { + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, CRON_WEBHOOK_TIMEOUT_MS); + + try { + const result = await fetchWithSsrFGuard({ + url: params.webhookUrl, + init: { + method: "POST", + headers: buildCronWebhookHeaders(params.webhookToken), + body: JSON.stringify(params.payload), + signal: abortController.signal, + }, + }); + await result.release(); + } catch (err) { + if (err instanceof SsrFBlockedError) { + params.logger.warn( + { + ...params.logContext, + reason: formatErrorMessage(err), + webhookUrl: redactWebhookUrl(params.webhookUrl), + }, + params.blockedLog, + ); + } else { + params.logger.warn( + { + ...params.logContext, + err: formatErrorMessage(err), + webhookUrl: redactWebhookUrl(params.webhookUrl), + }, + params.failedLog, + ); + } + } finally { + clearTimeout(timeout); + } +} + export function buildGatewayCronService(params: { cfg: ReturnType; deps: CliDeps; @@ -226,11 +287,51 @@ export function buildGatewayCronService(params: { lane: "cron", }); }, - sendCronFailureAlert: async ({ job, text, channel, to }) => { + sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); + const webhookToken = params.cfg.cron?.webhookToken?.trim(); + + // Webhook mode requires a URL - fail closed if missing + if (mode === "webhook" && !to) { + cronLogger.warn( + { jobId: job.id }, + "cron: failure alert webhook mode requires URL, skipping", + ); + return; + } + + if (mode === "webhook" && to) { + const webhookUrl = normalizeHttpWebhookUrl(to); + if (webhookUrl) { + await postCronWebhook({ + webhookUrl, + webhookToken, + payload: { + jobId: job.id, + jobName: job.name, + message: text, + }, + logContext: { jobId: job.id }, + blockedLog: "cron: failure alert webhook blocked by SSRF guard", + failedLog: "cron: failure alert webhook failed", + logger: cronLogger, + }); + } else { + cronLogger.warn( + { + jobId: job.id, + webhookUrl: redactWebhookUrl(to), + }, + "cron: failure alert webhook URL is invalid, skipping", + ); + } + return; + } + const target = await resolveDeliveryTarget(runtimeConfig, agentId, { channel, to, + accountId, }); if (!target.ok) { throw target.error; @@ -284,54 +385,81 @@ export function buildGatewayCronService(params: { } if (webhookTarget && evt.summary) { - const headers: Record = { - "Content-Type": "application/json", - }; - if (webhookToken) { - headers.Authorization = `Bearer ${webhookToken}`; - } - const abortController = new AbortController(); - const timeout = setTimeout(() => { - abortController.abort(); - }, CRON_WEBHOOK_TIMEOUT_MS); - void (async () => { - try { - const result = await fetchWithSsrFGuard({ - url: webhookTarget.url, - init: { - method: "POST", - headers, - body: JSON.stringify(evt), - signal: abortController.signal, - }, - }); - await result.release(); - } catch (err) { - if (err instanceof SsrFBlockedError) { - cronLogger.warn( - { - reason: formatErrorMessage(err), - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(webhookTarget.url), - }, - "cron: webhook delivery blocked by SSRF guard", - ); - } else { - cronLogger.warn( - { - err: formatErrorMessage(err), - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(webhookTarget.url), - }, - "cron: webhook delivery failed", - ); - } - } finally { - clearTimeout(timeout); - } + await postCronWebhook({ + webhookUrl: webhookTarget.url, + webhookToken, + payload: evt, + logContext: { jobId: evt.jobId }, + blockedLog: "cron: webhook delivery blocked by SSRF guard", + failedLog: "cron: webhook delivery failed", + logger: cronLogger, + }); })(); } + + if (evt.status === "error" && job) { + const failureDest = resolveFailureDestination(job, params.cfg.cron?.failureDestination); + if (failureDest) { + const isBestEffort = + job.delivery?.bestEffort === true || + (job.payload.kind === "agentTurn" && job.payload.bestEffortDeliver === true); + + if (!isBestEffort) { + const failureMessage = `Cron job "${job.name}" failed: ${evt.error ?? "unknown error"}`; + const failurePayload = { + jobId: job.id, + jobName: job.name, + message: failureMessage, + status: evt.status, + error: evt.error, + runAtMs: evt.runAtMs, + durationMs: evt.durationMs, + nextRunAtMs: evt.nextRunAtMs, + }; + + if (failureDest.mode === "webhook" && failureDest.to) { + const webhookUrl = normalizeHttpWebhookUrl(failureDest.to); + if (webhookUrl) { + void (async () => { + await postCronWebhook({ + webhookUrl, + webhookToken, + payload: failurePayload, + logContext: { jobId: evt.jobId }, + blockedLog: "cron: failure destination webhook blocked by SSRF guard", + failedLog: "cron: failure destination webhook failed", + logger: cronLogger, + }); + })(); + } else { + cronLogger.warn( + { + jobId: evt.jobId, + webhookUrl: redactWebhookUrl(failureDest.to), + }, + "cron: failure destination webhook URL is invalid, skipping", + ); + } + } else if (failureDest.mode === "announce") { + const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); + void sendFailureNotificationAnnounce( + params.deps, + runtimeConfig, + agentId, + job.id, + { + channel: failureDest.channel, + to: failureDest.to, + accountId: failureDest.accountId, + }, + `[Cron Failure] ${failureMessage}`, + ); + } + } + } + } + const logPath = resolveCronRunLogPath({ storePath, jobId: evt.jobId, diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 959c8365228..4ee5f4d1a8d 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -644,6 +644,58 @@ describe("gateway server cron", () => { await yieldToEventLoop(); expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2); + fetchWithSsrFGuardMock.mockClear(); + cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "delivery failed" }); + const failureDestRes = await rpcReq(ws, "cron.add", { + name: "failure destination webhook", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "test" }, + delivery: { + mode: "announce", + channel: "telegram", + to: "19098680", + failureDestination: { + mode: "webhook", + to: "https://example.invalid/failure-destination", + }, + }, + }); + expect(failureDestRes.ok).toBe(true); + const failureDestJobIdValue = (failureDestRes.payload as { id?: unknown } | null)?.id; + const failureDestJobId = + typeof failureDestJobIdValue === "string" ? failureDestJobIdValue : ""; + expect(failureDestJobId.length > 0).toBe(true); + + const failureDestRunRes = await rpcReq( + ws, + "cron.run", + { id: failureDestJobId, mode: "force" }, + 20_000, + ); + expect(failureDestRunRes.ok).toBe(true); + await waitForCondition( + () => fetchWithSsrFGuardMock.mock.calls.length === 1, + CRON_WAIT_TIMEOUT_MS, + ); + const [failureDestArgs] = fetchWithSsrFGuardMock.mock.calls[0] as unknown as [ + { + url?: string; + init?: { + method?: string; + headers?: Record; + body?: string; + }; + }, + ]; + expect(failureDestArgs.url).toBe("https://example.invalid/failure-destination"); + const failureDestBody = JSON.parse(failureDestArgs.init?.body ?? "{}"); + expect(failureDestBody.message).toBe( + 'Cron job "failure destination webhook" failed: unknown error', + ); + cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" }); const noSummaryRes = await rpcReq(ws, "cron.add", { name: "webhook no summary", @@ -668,7 +720,7 @@ describe("gateway server cron", () => { expect(noSummaryRunRes.ok).toBe(true); await yieldToEventLoop(); await yieldToEventLoop(); - expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2); + expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1); } finally { await cleanupCronTestRun({ ws, server, dir, prevSkipCron }); } diff --git a/ui/src/ui/app-defaults.ts b/ui/src/ui/app-defaults.ts index 451e95b4ee4..fa8eff7012c 100644 --- a/ui/src/ui/app-defaults.ts +++ b/ui/src/ui/app-defaults.ts @@ -44,5 +44,7 @@ export const DEFAULT_CRON_FORM: CronFormState = { failureAlertCooldownSeconds: "3600", failureAlertChannel: "last", failureAlertTo: "", + failureAlertDeliveryMode: "announce", + failureAlertAccountId: "", timeoutSeconds: "", }; diff --git a/ui/src/ui/controllers/cron.test.ts b/ui/src/ui/controllers/cron.test.ts index d3f6b715770..11a32981635 100644 --- a/ui/src/ui/controllers/cron.test.ts +++ b/ui/src/ui/controllers/cron.test.ts @@ -630,6 +630,52 @@ describe("cron controller", () => { cooldownMs: 120_000, channel: "telegram", to: "123456", + mode: "announce", + accountId: undefined, + }, + }, + }); + }); + + it("includes failure alert mode/accountId in cron.update patch", async () => { + const request = vi.fn(async (method: string, _payload?: unknown) => { + if (method === "cron.update") { + return { id: "job-alert-mode" }; + } + if (method === "cron.list") { + return { jobs: [{ id: "job-alert-mode" }] }; + } + if (method === "cron.status") { + return { enabled: true, jobs: 1, nextWakeAtMs: null }; + } + return {}; + }); + const state = createState({ + client: { request } as unknown as CronState["client"], + cronEditingJobId: "job-alert-mode", + cronForm: { + ...DEFAULT_CRON_FORM, + name: "alert mode job", + payloadKind: "agentTurn", + payloadText: "run it", + failureAlertMode: "custom", + failureAlertAfter: "1", + failureAlertDeliveryMode: "webhook", + failureAlertAccountId: "bot-a", + }, + }); + + await addCronJob(state); + + const updateCall = request.mock.calls.find(([method]) => method === "cron.update"); + expect(updateCall).toBeDefined(); + expect(updateCall?.[1]).toMatchObject({ + id: "job-alert-mode", + patch: { + failureAlert: { + after: 1, + mode: "webhook", + accountId: "bot-a", }, }, }); @@ -780,6 +826,8 @@ describe("cron controller", () => { expect(state.cronForm.failureAlertCooldownSeconds).toBe("30"); expect(state.cronForm.failureAlertChannel).toBe("telegram"); expect(state.cronForm.failureAlertTo).toBe("999"); + expect(state.cronForm.failureAlertDeliveryMode).toBe("announce"); + expect(state.cronForm.failureAlertAccountId).toBe(""); }); it("validates key cron form errors", () => { diff --git a/ui/src/ui/controllers/cron.ts b/ui/src/ui/controllers/cron.ts index 151f62a6893..c81d69c57ea 100644 --- a/ui/src/ui/controllers/cron.ts +++ b/ui/src/ui/controllers/cron.ts @@ -481,6 +481,12 @@ function jobToForm(job: CronJob, prev: CronFormState): CronFormState { ? (failureAlert.channel ?? CRON_CHANNEL_LAST) : CRON_CHANNEL_LAST, failureAlertTo: failureAlert && typeof failureAlert === "object" ? (failureAlert.to ?? "") : "", + failureAlertDeliveryMode: + failureAlert && typeof failureAlert === "object" + ? (failureAlert.mode ?? "announce") + : "announce", + failureAlertAccountId: + failureAlert && typeof failureAlert === "object" ? (failureAlert.accountId ?? "") : "", timeoutSeconds: job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number" ? String(job.payload.timeoutSeconds) @@ -593,12 +599,21 @@ function buildFailureAlert(form: CronFormState) { cooldownSeconds !== undefined && Number.isFinite(cooldownSeconds) && cooldownSeconds >= 0 ? Math.floor(cooldownSeconds * 1000) : undefined; - return { + const deliveryMode = form.failureAlertDeliveryMode; + const accountId = form.failureAlertAccountId.trim(); + const patch: Record = { after: after > 0 ? Math.floor(after) : undefined, channel: form.failureAlertChannel.trim() || CRON_CHANNEL_LAST, to: form.failureAlertTo.trim() || undefined, ...(cooldownMs !== undefined ? { cooldownMs } : {}), }; + // Always include mode and accountId so users can switch/clear them + if (deliveryMode) { + patch.mode = deliveryMode; + } + // Include accountId if explicitly set, or send undefined to allow clearing + patch.accountId = accountId || undefined; + return patch; } export async function addCronJob(state: CronState) { diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index 943a0ebf465..9c6ff164308 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -479,6 +479,14 @@ export type CronDelivery = { to?: string; accountId?: string; bestEffort?: boolean; + failureDestination?: CronFailureDestination; +}; + +export type CronFailureDestination = { + channel?: string; + to?: string; + mode?: "announce" | "webhook"; + accountId?: string; }; export type CronFailureAlert = { @@ -486,6 +494,8 @@ export type CronFailureAlert = { channel?: string; to?: string; cooldownMs?: number; + mode?: "announce" | "webhook"; + accountId?: string; }; export type CronJobState = { diff --git a/ui/src/ui/ui-types.ts b/ui/src/ui/ui-types.ts index 2b837067ee6..b5c2a3b09bf 100644 --- a/ui/src/ui/ui-types.ts +++ b/ui/src/ui/ui-types.ts @@ -48,5 +48,7 @@ export type CronFormState = { failureAlertCooldownSeconds: string; failureAlertChannel: string; failureAlertTo: string; + failureAlertDeliveryMode: "announce" | "webhook"; + failureAlertAccountId: string; timeoutSeconds: string; }; diff --git a/ui/src/ui/views/cron.ts b/ui/src/ui/views/cron.ts index 2907f7f6cd4..296a692d115 100644 --- a/ui/src/ui/views/cron.ts +++ b/ui/src/ui/views/cron.ts @@ -1279,6 +1279,31 @@ export function renderCron(props: CronProps) { Optional recipient override for failure alerts. + + ` : nothing }