From 85148f3b2099fe62fd3b87b1ab6cea06c5064f42 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 06:44:48 +0100 Subject: [PATCH] refactor(cron): split notification routing --- src/cron/delivery.ts | 148 +++++++--- src/cron/session-target.test.ts | 48 +++ src/cron/session-target.ts | 20 ++ src/gateway/server-cron-notifications.ts | 358 +++++++++++++++++++++++ src/gateway/server-cron.ts | 332 ++------------------- 5 files changed, 565 insertions(+), 341 deletions(-) create mode 100644 src/cron/session-target.test.ts create mode 100644 src/gateway/server-cron-notifications.ts diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index cec0e8a3d2b..d59e2f6fe26 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -13,7 +13,11 @@ import { type CronDeliveryPlan, resolveCronDeliveryPlan, } from "./delivery-plan.js"; -import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js"; +import { + resolveDeliveryTarget, + type DeliveryTargetResolution, +} from "./isolated-agent/delivery-target.js"; +import { resolveCronNotificationSessionKey } from "./session-target.js"; import type { CronMessageChannel } from "./types.js"; export { @@ -27,65 +31,143 @@ export { const FAILURE_NOTIFICATION_TIMEOUT_MS = 30_000; const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" }); +export type CronAnnounceTarget = { + channel?: string; + to?: string; + accountId?: string; + sessionKey?: string; +}; + +type SuccessfulDeliveryTarget = Extract; + +async function resolveCronAnnounceDelivery(params: { + cfg: OpenClawConfig; + agentId: string; + jobId: string; + target: CronAnnounceTarget; +}): Promise< + | { + ok: true; + resolvedTarget: SuccessfulDeliveryTarget; + session: ReturnType; + identity: ReturnType; + } + | { ok: false; error: Error } +> { + const resolvedTarget = await resolveDeliveryTarget(params.cfg, params.agentId, { + channel: params.target.channel as CronMessageChannel | undefined, + to: params.target.to, + accountId: params.target.accountId, + sessionKey: params.target.sessionKey, + }); + + if (!resolvedTarget.ok) { + return { ok: false, error: resolvedTarget.error }; + } + + const identity = resolveAgentOutboundIdentity(params.cfg, params.agentId); + const session = buildOutboundSessionContext({ + cfg: params.cfg, + agentId: params.agentId, + sessionKey: resolveCronNotificationSessionKey({ + jobId: params.jobId, + sessionKey: params.target.sessionKey, + }), + }); + + return { + ok: true, + resolvedTarget, + session, + identity, + }; +} + +async function deliverCronAnnouncePayload(params: { + deps: CliDeps; + cfg: OpenClawConfig; + delivery: { + resolvedTarget: SuccessfulDeliveryTarget; + session: ReturnType; + identity: ReturnType; + }; + message: string; + abortSignal: AbortSignal; +}): Promise { + await deliverOutboundPayloads({ + cfg: params.cfg, + channel: params.delivery.resolvedTarget.channel, + to: params.delivery.resolvedTarget.to, + accountId: params.delivery.resolvedTarget.accountId, + threadId: params.delivery.resolvedTarget.threadId, + payloads: [{ text: params.message }], + session: params.delivery.session, + identity: params.delivery.identity, + bestEffort: false, + deps: createOutboundSendDeps(params.deps), + abortSignal: params.abortSignal, + }); +} + +export async function sendCronAnnouncePayloadStrict(params: { + deps: CliDeps; + cfg: OpenClawConfig; + agentId: string; + jobId: string; + target: CronAnnounceTarget; + message: string; + abortSignal: AbortSignal; +}): Promise { + const delivery = await resolveCronAnnounceDelivery(params); + if (!delivery.ok) { + throw delivery.error; + } + await deliverCronAnnouncePayload({ + deps: params.deps, + cfg: params.cfg, + delivery, + message: params.message, + abortSignal: params.abortSignal, + }); +} + export async function sendFailureNotificationAnnounce( deps: CliDeps, cfg: OpenClawConfig, agentId: string, jobId: string, - target: { channel?: string; to?: string; accountId?: string; sessionKey?: string }, + target: CronAnnounceTarget, message: string, ): Promise { - const resolvedTarget = await resolveDeliveryTarget(cfg, agentId, { - channel: target.channel as CronMessageChannel | undefined, - to: target.to, - accountId: target.accountId, - sessionKey: target.sessionKey, - }); + const delivery = await resolveCronAnnounceDelivery({ cfg, agentId, jobId, target }); - if (!resolvedTarget.ok) { + if (!delivery.ok) { cronDeliveryLogger.warn( - { error: resolvedTarget.error.message }, + { error: delivery.error.message }, "cron: failed to resolve failure destination target", ); return; } - const identity = resolveAgentOutboundIdentity(cfg, agentId); - const deliverySessionKey = - typeof target.sessionKey === "string" && target.sessionKey.trim() - ? target.sessionKey.trim() - : `cron:${jobId}:failure`; - const session = buildOutboundSessionContext({ - cfg, - agentId, - sessionKey: deliverySessionKey, - }); - const abortController = new AbortController(); const timeout = setTimeout(() => { abortController.abort(); }, FAILURE_NOTIFICATION_TIMEOUT_MS); try { - await deliverOutboundPayloads({ + await deliverCronAnnouncePayload({ + deps, cfg, - channel: resolvedTarget.channel, - to: resolvedTarget.to, - accountId: resolvedTarget.accountId, - threadId: resolvedTarget.threadId, - payloads: [{ text: message }], - session, - identity, - bestEffort: false, - deps: createOutboundSendDeps(deps), + delivery, + message, abortSignal: abortController.signal, }); } catch (err) { cronDeliveryLogger.warn( { err: formatErrorMessage(err), - channel: resolvedTarget.channel, - to: resolvedTarget.to, + channel: delivery.resolvedTarget.channel, + to: delivery.resolvedTarget.to, }, "cron: failure destination announce failed", ); diff --git a/src/cron/session-target.test.ts b/src/cron/session-target.test.ts new file mode 100644 index 00000000000..12d5f7f41cc --- /dev/null +++ b/src/cron/session-target.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it } from "vitest"; +import { + resolveCronDeliverySessionKey, + resolveCronFailureNotificationSessionKey, + resolveCronNotificationSessionKey, + resolveCronSessionTargetSessionKey, +} from "./session-target.js"; + +describe("cron session target helpers", () => { + it("extracts and trims persistent session targets", () => { + expect(resolveCronSessionTargetSessionKey("session: agent:main:telegram:direct:123 ")).toBe( + "agent:main:telegram:direct:123", + ); + }); + + it("rejects unsafe persistent session targets", () => { + expect(() => resolveCronSessionTargetSessionKey("session:../../outside")).toThrow( + "invalid cron sessionTarget session id", + ); + }); + + it("prefers sessionTarget over creator sessionKey for delivery", () => { + expect( + resolveCronDeliverySessionKey({ + sessionTarget: "session:agent:main:telegram:direct:123", + sessionKey: "agent:main:telegram:group:ops:sender:123", + }), + ).toBe("agent:main:telegram:direct:123"); + }); + + it("falls back to trimmed creator sessionKey for delivery", () => { + expect( + resolveCronDeliverySessionKey({ + sessionTarget: "isolated", + sessionKey: " agent:main:telegram:group:ops:sender:123 ", + }), + ).toBe("agent:main:telegram:group:ops:sender:123"); + }); + + it("uses cron failure session fallback when no delivery session exists", () => { + expect(resolveCronNotificationSessionKey({ jobId: "job-1", sessionKey: " " })).toBe( + "cron:job-1:failure", + ); + expect( + resolveCronFailureNotificationSessionKey({ id: "job-2", sessionTarget: "isolated" }), + ).toBe("cron:job-2:failure"); + }); +}); diff --git a/src/cron/session-target.ts b/src/cron/session-target.ts index f742818cad4..ecf2877bd3f 100644 --- a/src/cron/session-target.ts +++ b/src/cron/session-target.ts @@ -36,3 +36,23 @@ export function resolveCronDeliverySessionKey(job: { ? job.sessionKey.trim() : undefined; } + +export function resolveCronNotificationSessionKey(params: { + jobId: string; + sessionKey?: string | null; +}): string { + return typeof params.sessionKey === "string" && params.sessionKey.trim() + ? params.sessionKey.trim() + : `cron:${params.jobId}:failure`; +} + +export function resolveCronFailureNotificationSessionKey(job: { + id: string; + sessionTarget?: string | null; + sessionKey?: string | null; +}): string { + return resolveCronNotificationSessionKey({ + jobId: job.id, + sessionKey: resolveCronDeliverySessionKey(job), + }); +} diff --git a/src/gateway/server-cron-notifications.ts b/src/gateway/server-cron-notifications.ts new file mode 100644 index 00000000000..adc5409b3db --- /dev/null +++ b/src/gateway/server-cron-notifications.ts @@ -0,0 +1,358 @@ +import type { CliDeps } from "../cli/deps.types.js"; +import type { CronFailureDestinationConfig } from "../config/types.cron.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { + resolveCronDeliveryPlan, + resolveFailureDestination, + sendCronAnnouncePayloadStrict, + sendFailureNotificationAnnounce, +} from "../cron/delivery.js"; +import type { CronEvent } from "../cron/service.js"; +import { resolveCronDeliverySessionKey } from "../cron/session-target.js"; +import type { CronJob, CronMessageChannel } from "../cron/types.js"; +import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js"; +import { formatErrorMessage } from "../infra/errors.js"; +import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js"; +import { SsrFBlockedError } from "../infra/net/ssrf.js"; +import { + normalizeOptionalLowercaseString, + normalizeOptionalString, +} from "../shared/string-coerce.js"; + +const CRON_WEBHOOK_TIMEOUT_MS = 10_000; + +type CronLogger = { + warn: (obj: unknown, msg?: string) => void; +}; + +type CronAgentResolver = (requested?: string | null) => { + agentId: string; + cfg: OpenClawConfig; +}; + +type CronWebhookTarget = { + url: string; + source: "delivery" | "legacy"; +}; + +function redactWebhookUrl(url: string): string { + try { + const parsed = new URL(url); + return `${parsed.origin}${parsed.pathname}`; + } catch { + return ""; + } +} + +function resolveCronWebhookTarget(params: { + delivery?: { mode?: string; to?: string }; + legacyNotify?: boolean; + legacyWebhook?: string; +}): CronWebhookTarget | null { + const mode = normalizeOptionalLowercaseString(params.delivery?.mode); + if (mode === "webhook") { + const url = normalizeHttpWebhookUrl(params.delivery?.to); + return url ? { url, source: "delivery" } : null; + } + + if (params.legacyNotify) { + const legacyUrl = normalizeHttpWebhookUrl(params.legacyWebhook); + if (legacyUrl) { + return { url: legacyUrl, source: "legacy" }; + } + } + + return null; +} + +function buildCronWebhookHeaders(webhookToken?: string): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + if (webhookToken) { + headers.Authorization = `Bearer ${webhookToken}`; + } + return headers; +} + +async function postCronWebhook(params: { + webhookUrl: string; + webhookToken?: string; + payload: unknown; + logContext: Record; + blockedLog: string; + failedLog: string; + logger: CronLogger; +}): Promise { + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, CRON_WEBHOOK_TIMEOUT_MS); + + try { + const result = await fetchWithSsrFGuard({ + url: params.webhookUrl, + init: { + method: "POST", + headers: buildCronWebhookHeaders(params.webhookToken), + body: JSON.stringify(params.payload), + signal: abortController.signal, + }, + }); + await result.release(); + } catch (err) { + if (err instanceof SsrFBlockedError) { + params.logger.warn( + { + ...params.logContext, + reason: formatErrorMessage(err), + webhookUrl: redactWebhookUrl(params.webhookUrl), + }, + params.blockedLog, + ); + } else { + params.logger.warn( + { + ...params.logContext, + err: formatErrorMessage(err), + webhookUrl: redactWebhookUrl(params.webhookUrl), + }, + params.failedLog, + ); + } + } finally { + clearTimeout(timeout); + } +} + +export async function sendGatewayCronFailureAlert(params: { + deps: CliDeps; + logger: CronLogger; + resolveCronAgent: CronAgentResolver; + webhookToken?: unknown; + job: CronJob; + text: string; + channel: CronMessageChannel; + to?: string; + mode?: "announce" | "webhook"; + accountId?: string; +}): Promise { + const { agentId, cfg: runtimeConfig } = params.resolveCronAgent(params.job.agentId); + const webhookToken = normalizeOptionalString(params.webhookToken); + + if (params.mode === "webhook" && !params.to) { + params.logger.warn( + { jobId: params.job.id }, + "cron: failure alert webhook mode requires URL, skipping", + ); + return; + } + + if (params.mode === "webhook" && params.to) { + const webhookUrl = normalizeHttpWebhookUrl(params.to); + if (webhookUrl) { + await postCronWebhook({ + webhookUrl, + webhookToken, + payload: { + jobId: params.job.id, + jobName: params.job.name, + message: params.text, + }, + logContext: { jobId: params.job.id }, + blockedLog: "cron: failure alert webhook blocked by SSRF guard", + failedLog: "cron: failure alert webhook failed", + logger: params.logger, + }); + } else { + params.logger.warn( + { + jobId: params.job.id, + webhookUrl: redactWebhookUrl(params.to), + }, + "cron: failure alert webhook URL is invalid, skipping", + ); + } + return; + } + + const abortController = new AbortController(); + await sendCronAnnouncePayloadStrict({ + deps: params.deps, + cfg: runtimeConfig, + agentId, + jobId: params.job.id, + target: { + channel: params.channel, + to: params.to, + accountId: params.accountId, + sessionKey: resolveCronDeliverySessionKey(params.job), + }, + message: params.text, + abortSignal: abortController.signal, + }); +} + +export function dispatchGatewayCronFinishedNotifications(params: { + evt: CronEvent; + job?: CronJob; + deps: CliDeps; + logger: CronLogger; + resolveCronAgent: CronAgentResolver; + webhookToken?: unknown; + legacyWebhook?: unknown; + globalFailureDestination?: CronFailureDestinationConfig; + warnedLegacyWebhookJobs: Set; +}): void { + const webhookToken = normalizeOptionalString(params.webhookToken); + const legacyWebhook = normalizeOptionalString(params.legacyWebhook); + const legacyNotify = (params.job as { notify?: unknown } | undefined)?.notify === true; + const webhookTarget = resolveCronWebhookTarget({ + delivery: + params.job?.delivery && typeof params.job.delivery.mode === "string" + ? { mode: params.job.delivery.mode, to: params.job.delivery.to } + : undefined, + legacyNotify, + legacyWebhook, + }); + + if (!webhookTarget && params.job?.delivery?.mode === "webhook") { + params.logger.warn( + { + jobId: params.evt.jobId, + deliveryTo: params.job.delivery.to, + }, + "cron: skipped webhook delivery, delivery.to must be a valid http(s) URL", + ); + } + + if (webhookTarget?.source === "legacy" && !params.warnedLegacyWebhookJobs.has(params.evt.jobId)) { + params.warnedLegacyWebhookJobs.add(params.evt.jobId); + params.logger.warn( + { + jobId: params.evt.jobId, + legacyWebhook: redactWebhookUrl(webhookTarget.url), + }, + "cron: deprecated notify+cron.webhook fallback in use, migrate to delivery.mode=webhook with delivery.to", + ); + } + + if (webhookTarget && params.evt.summary) { + void (async () => { + await postCronWebhook({ + webhookUrl: webhookTarget.url, + webhookToken, + payload: params.evt, + logContext: { jobId: params.evt.jobId }, + blockedLog: "cron: webhook delivery blocked by SSRF guard", + failedLog: "cron: webhook delivery failed", + logger: params.logger, + }); + })(); + } + + dispatchCronFailureDestinationNotifications({ + evt: params.evt, + job: params.job, + deps: params.deps, + logger: params.logger, + resolveCronAgent: params.resolveCronAgent, + webhookToken, + globalFailureDestination: params.globalFailureDestination, + }); +} + +function dispatchCronFailureDestinationNotifications(params: { + evt: CronEvent; + job?: CronJob; + deps: CliDeps; + logger: CronLogger; + resolveCronAgent: CronAgentResolver; + webhookToken?: string; + globalFailureDestination?: CronFailureDestinationConfig; +}): void { + if (params.evt.status !== "error" || !params.job || params.job.delivery?.bestEffort === true) { + return; + } + + const failureMessage = `Cron job "${params.job.name}" failed: ${params.evt.error ?? "unknown error"}`; + const failureDest = resolveFailureDestination(params.job, params.globalFailureDestination); + const deliverySessionKey = resolveCronDeliverySessionKey(params.job); + + if (failureDest) { + const failurePayload = { + jobId: params.job.id, + jobName: params.job.name, + message: failureMessage, + status: params.evt.status, + error: params.evt.error, + runAtMs: params.evt.runAtMs, + durationMs: params.evt.durationMs, + nextRunAtMs: params.evt.nextRunAtMs, + }; + + if (failureDest.mode === "webhook" && failureDest.to) { + const webhookUrl = normalizeHttpWebhookUrl(failureDest.to); + if (webhookUrl) { + void (async () => { + await postCronWebhook({ + webhookUrl, + webhookToken: params.webhookToken, + payload: failurePayload, + logContext: { jobId: params.evt.jobId }, + blockedLog: "cron: failure destination webhook blocked by SSRF guard", + failedLog: "cron: failure destination webhook failed", + logger: params.logger, + }); + })(); + } else { + params.logger.warn( + { + jobId: params.evt.jobId, + webhookUrl: redactWebhookUrl(failureDest.to), + }, + "cron: failure destination webhook URL is invalid, skipping", + ); + } + return; + } + + if (failureDest.mode === "announce") { + const { agentId, cfg: runtimeConfig } = params.resolveCronAgent(params.job.agentId); + void sendFailureNotificationAnnounce( + params.deps, + runtimeConfig, + agentId, + params.job.id, + { + channel: failureDest.channel, + to: failureDest.to, + accountId: failureDest.accountId, + sessionKey: deliverySessionKey, + }, + `⚠️ ${failureMessage}`, + ); + } + return; + } + + const primaryPlan = resolveCronDeliveryPlan(params.job); + if (primaryPlan.mode !== "announce" || !primaryPlan.requested) { + return; + } + + const { agentId, cfg: runtimeConfig } = params.resolveCronAgent(params.job.agentId); + void sendFailureNotificationAnnounce( + params.deps, + runtimeConfig, + agentId, + params.job.id, + { + channel: primaryPlan.channel, + to: primaryPlan.to, + accountId: primaryPlan.accountId, + sessionKey: deliverySessionKey, + }, + `⚠️ ${failureMessage}`, + ); +} diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index cb59f80e6cf..ca919f386e1 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -1,7 +1,6 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { cleanupBrowserSessionsForLifecycleEnd } from "../browser-lifecycle-cleanup.js"; import type { CliDeps } from "../cli/deps.types.js"; -import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import { loadConfig } from "../config/config.js"; import { canonicalizeMainSessionAlias, @@ -10,40 +9,25 @@ import { } from "../config/sessions.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { - resolveCronDeliveryPlan, - resolveFailureDestination, - sendFailureNotificationAnnounce, -} from "../cron/delivery.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; -import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js"; import { appendCronRunLog, resolveCronRunLogPath, resolveCronRunLogPruneOptions, } from "../cron/run-log.js"; import { CronService } from "../cron/service.js"; -import { - resolveCronDeliverySessionKey, - resolveCronSessionTargetSessionKey, -} from "../cron/session-target.js"; +import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js"; import { resolveCronStorePath } from "../cron/store.js"; -import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js"; -import { formatErrorMessage } from "../infra/errors.js"; import { runHeartbeatOnce } from "../infra/heartbeat-runner.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; -import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js"; -import { SsrFBlockedError } from "../infra/net/ssrf.js"; -import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; -import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; import { - normalizeOptionalLowercaseString, - normalizeOptionalString, -} from "../shared/string-coerce.js"; + dispatchGatewayCronFinishedNotifications, + sendGatewayCronFailureAlert, +} from "./server-cron-notifications.js"; export type GatewayCronState = { cron: CronService; @@ -51,103 +35,6 @@ export type GatewayCronState = { cronEnabled: boolean; }; -const CRON_WEBHOOK_TIMEOUT_MS = 10_000; - -function redactWebhookUrl(url: string): string { - try { - const parsed = new URL(url); - return `${parsed.origin}${parsed.pathname}`; - } catch { - return ""; - } -} - -type CronWebhookTarget = { - url: string; - source: "delivery" | "legacy"; -}; - -function resolveCronWebhookTarget(params: { - delivery?: { mode?: string; to?: string }; - legacyNotify?: boolean; - legacyWebhook?: string; -}): CronWebhookTarget | null { - const mode = normalizeOptionalLowercaseString(params.delivery?.mode); - if (mode === "webhook") { - const url = normalizeHttpWebhookUrl(params.delivery?.to); - return url ? { url, source: "delivery" } : null; - } - - if (params.legacyNotify) { - const legacyUrl = normalizeHttpWebhookUrl(params.legacyWebhook); - if (legacyUrl) { - return { url: legacyUrl, source: "legacy" }; - } - } - - return null; -} - -function buildCronWebhookHeaders(webhookToken?: string): Record { - const headers: Record = { - "Content-Type": "application/json", - }; - if (webhookToken) { - headers.Authorization = `Bearer ${webhookToken}`; - } - return headers; -} - -async function postCronWebhook(params: { - webhookUrl: string; - webhookToken?: string; - payload: unknown; - logContext: Record; - blockedLog: string; - failedLog: string; - logger: ReturnType; -}): Promise { - const abortController = new AbortController(); - const timeout = setTimeout(() => { - abortController.abort(); - }, CRON_WEBHOOK_TIMEOUT_MS); - - try { - const result = await fetchWithSsrFGuard({ - url: params.webhookUrl, - init: { - method: "POST", - headers: buildCronWebhookHeaders(params.webhookToken), - body: JSON.stringify(params.payload), - signal: abortController.signal, - }, - }); - await result.release(); - } catch (err) { - if (err instanceof SsrFBlockedError) { - params.logger.warn( - { - ...params.logContext, - reason: formatErrorMessage(err), - webhookUrl: redactWebhookUrl(params.webhookUrl), - }, - params.blockedLog, - ); - } else { - params.logger.warn( - { - ...params.logContext, - err: formatErrorMessage(err), - webhookUrl: redactWebhookUrl(params.webhookUrl), - }, - params.failedLog, - ); - } - } finally { - clearTimeout(timeout); - } -} - export function buildGatewayCronService(params: { cfg: OpenClawConfig; deps: CliDeps; @@ -355,207 +242,36 @@ export function buildGatewayCronService(params: { }); } }, - sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => { - const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); - const webhookToken = normalizeOptionalString(params.cfg.cron?.webhookToken); - - // Webhook mode requires a URL - fail closed if missing - if (mode === "webhook" && !to) { - cronLogger.warn( - { jobId: job.id }, - "cron: failure alert webhook mode requires URL, skipping", - ); - return; - } - - if (mode === "webhook" && to) { - const webhookUrl = normalizeHttpWebhookUrl(to); - if (webhookUrl) { - await postCronWebhook({ - webhookUrl, - webhookToken, - payload: { - jobId: job.id, - jobName: job.name, - message: text, - }, - logContext: { jobId: job.id }, - blockedLog: "cron: failure alert webhook blocked by SSRF guard", - failedLog: "cron: failure alert webhook failed", - logger: cronLogger, - }); - } else { - cronLogger.warn( - { - jobId: job.id, - webhookUrl: redactWebhookUrl(to), - }, - "cron: failure alert webhook URL is invalid, skipping", - ); - } - return; - } - - const deliverySessionKey = resolveCronDeliverySessionKey(job); - const target = await resolveDeliveryTarget(runtimeConfig, agentId, { + sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => + await sendGatewayCronFailureAlert({ + deps: params.deps, + logger: cronLogger, + resolveCronAgent, + webhookToken: params.cfg.cron?.webhookToken, + job, + text, channel, to, + mode, accountId, - sessionKey: deliverySessionKey, - }); - if (!target.ok) { - throw target.error; - } - const session = buildOutboundSessionContext({ - cfg: runtimeConfig, - agentId, - sessionKey: deliverySessionKey ?? `cron:${job.id}:failure`, - }); - await deliverOutboundPayloads({ - cfg: runtimeConfig, - channel: target.channel, - to: target.to, - accountId: target.accountId, - threadId: target.threadId, - payloads: [{ text }], - deps: createOutboundSendDeps(params.deps), - session, - }); - }, + }), log: getChildLogger({ module: "cron", storePath }), onEvent: (evt) => { params.broadcast("cron", evt, { dropIfSlow: true }); if (evt.action === "finished") { - const webhookToken = normalizeOptionalString(params.cfg.cron?.webhookToken); - const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook); const job = cron.getJob(evt.jobId); - const legacyNotify = (job as { notify?: unknown } | undefined)?.notify === true; - const webhookTarget = resolveCronWebhookTarget({ - delivery: - job?.delivery && typeof job.delivery.mode === "string" - ? { mode: job.delivery.mode, to: job.delivery.to } - : undefined, - legacyNotify, - legacyWebhook, + dispatchGatewayCronFinishedNotifications({ + evt, + job, + deps: params.deps, + logger: cronLogger, + resolveCronAgent, + webhookToken: params.cfg.cron?.webhookToken, + legacyWebhook: params.cfg.cron?.webhook, + globalFailureDestination: params.cfg.cron?.failureDestination, + warnedLegacyWebhookJobs, }); - if (!webhookTarget && job?.delivery?.mode === "webhook") { - cronLogger.warn( - { - jobId: evt.jobId, - deliveryTo: job.delivery.to, - }, - "cron: skipped webhook delivery, delivery.to must be a valid http(s) URL", - ); - } - - if (webhookTarget?.source === "legacy" && !warnedLegacyWebhookJobs.has(evt.jobId)) { - warnedLegacyWebhookJobs.add(evt.jobId); - cronLogger.warn( - { - jobId: evt.jobId, - legacyWebhook: redactWebhookUrl(webhookTarget.url), - }, - "cron: deprecated notify+cron.webhook fallback in use, migrate to delivery.mode=webhook with delivery.to", - ); - } - - if (webhookTarget && evt.summary) { - void (async () => { - await postCronWebhook({ - webhookUrl: webhookTarget.url, - webhookToken, - payload: evt, - logContext: { jobId: evt.jobId }, - blockedLog: "cron: webhook delivery blocked by SSRF guard", - failedLog: "cron: webhook delivery failed", - logger: cronLogger, - }); - })(); - } - - if (evt.status === "error" && job) { - const isBestEffort = job.delivery?.bestEffort === true; - if (!isBestEffort) { - const failureMessage = `Cron job "${job.name}" failed: ${evt.error ?? "unknown error"}`; - const failureDest = resolveFailureDestination(job, params.cfg.cron?.failureDestination); - const deliverySessionKey = resolveCronDeliverySessionKey(job); - - if (failureDest) { - // Explicit failureDestination configured — use it - const failurePayload = { - jobId: job.id, - jobName: job.name, - message: failureMessage, - status: evt.status, - error: evt.error, - runAtMs: evt.runAtMs, - durationMs: evt.durationMs, - nextRunAtMs: evt.nextRunAtMs, - }; - - if (failureDest.mode === "webhook" && failureDest.to) { - const webhookUrl = normalizeHttpWebhookUrl(failureDest.to); - if (webhookUrl) { - void (async () => { - await postCronWebhook({ - webhookUrl, - webhookToken, - payload: failurePayload, - logContext: { jobId: evt.jobId }, - blockedLog: "cron: failure destination webhook blocked by SSRF guard", - failedLog: "cron: failure destination webhook failed", - logger: cronLogger, - }); - })(); - } else { - cronLogger.warn( - { - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(failureDest.to), - }, - "cron: failure destination webhook URL is invalid, skipping", - ); - } - } else if (failureDest.mode === "announce") { - const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); - void sendFailureNotificationAnnounce( - params.deps, - runtimeConfig, - agentId, - job.id, - { - channel: failureDest.channel, - to: failureDest.to, - accountId: failureDest.accountId, - sessionKey: deliverySessionKey, - }, - `⚠️ ${failureMessage}`, - ); - } - } else { - // No explicit failureDestination — fall back to primary delivery channel (#60608) - const primaryPlan = resolveCronDeliveryPlan(job); - if (primaryPlan.mode === "announce" && primaryPlan.requested) { - const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); - void sendFailureNotificationAnnounce( - params.deps, - runtimeConfig, - agentId, - job.id, - { - channel: primaryPlan.channel, - to: primaryPlan.to, - accountId: primaryPlan.accountId, - sessionKey: deliverySessionKey, - }, - `⚠️ ${failureMessage}`, - ); - } - } - } - } - const logPath = resolveCronRunLogPath({ storePath, jobId: evt.jobId,