diff --git a/CHANGELOG.md b/CHANGELOG.md index bc4c9da6547..f29385ec6e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Plugin releases: require external package compatibility metadata in the npm plugin publish plan, matching the ClawHub package contract before packages ship. - Agents/OpenAI-compatible: honor per-model `max_completion_tokens`/`max_tokens` params in embedded OpenAI-completions runs so high-token Kimi-style routes keep their configured completion cap. Fixes #82230. Thanks @albert-zen. - Agents/local: install a local gateway request scope around trusted `openclaw agent --local` runs, so subagent completion announces can use in-process gateway dispatch without crashing. Fixes #82140. Thanks @Kushmaro. +- Cron: keep failed isolated-agent runs from marking successful result delivery when only the failure notification was delivered. Fixes #72985. Thanks @Allenbluff. - Discord: validate message-read results before normalizing channel history and report unexpected payloads with a Discord boundary error instead of `map is not a function`. Fixes #82252. Thanks @jessewunderlich. - Agents/runtime: apply `agents.defaults.models["provider/*"].agentRuntime` as provider-wide model runtime policy while preserving exact model runtime precedence. Fixes #82243. Thanks @rendrag-git. - Agents/auto-reply: restrict `NO_REPLY` prompt guidance to automatic group/channel replies, remove legacy silent-reply rewrites, and suppress accidental direct-chat silent tokens instead of delivering fallback text. Fixes #82254. Thanks @absol89. diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index ba071b2c0e3..738ddf8b0f1 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -5293,6 +5293,7 @@ public struct CronRunLogEntry: Codable, Sendable { public let delivered: Bool? public let deliverystatus: AnyCodable? public let deliveryerror: String? + public let failurenotificationdelivery: [String: AnyCodable]? public let sessionid: String? public let sessionkey: String? public let runid: String? @@ -5315,6 +5316,7 @@ public struct CronRunLogEntry: Codable, Sendable { delivered: Bool?, deliverystatus: AnyCodable?, deliveryerror: String?, + failurenotificationdelivery: [String: AnyCodable]? = nil, sessionid: String?, sessionkey: String?, runid: String?, @@ -5336,6 +5338,7 @@ public struct CronRunLogEntry: Codable, Sendable { self.delivered = delivered self.deliverystatus = deliverystatus self.deliveryerror = deliveryerror + self.failurenotificationdelivery = failurenotificationdelivery self.sessionid = sessionid self.sessionkey = sessionkey self.runid = runid @@ -5359,6 +5362,7 @@ public struct CronRunLogEntry: Codable, Sendable { case delivered case deliverystatus = "deliveryStatus" case deliveryerror = "deliveryError" + case failurenotificationdelivery = "failureNotificationDelivery" case sessionid = "sessionId" case sessionkey = "sessionKey" case runid = "runId" diff --git a/scripts/protocol-gen-swift.ts b/scripts/protocol-gen-swift.ts index c755256211f..750298190aa 100644 --- a/scripts/protocol-gen-swift.ts +++ b/scripts/protocol-gen-swift.ts @@ -42,6 +42,7 @@ const STRICT_LITERAL_STRUCTS = new Set([ const DEFAULTED_OPTIONAL_INIT_PARAMS: Record> = { MessageActionParams: new Set(["inboundTurnKind"]), + CronRunLogEntry: new Set(["failureNotificationDelivery"]), }; const header = `// Generated by scripts/protocol-gen-swift.ts — do not edit by hand\n// swiftlint:disable file_length\nimport Foundation\n\npublic let GATEWAY_PROTOCOL_VERSION = ${PROTOCOL_VERSION}\npublic let GATEWAY_MIN_PROTOCOL_VERSION = ${MIN_CLIENT_PROTOCOL_VERSION}\n\nprivate struct GatewayAnyCodingKey: CodingKey, Hashable {\n let stringValue: String\n let intValue: Int?\n\n init?(stringValue: String) {\n self.stringValue = stringValue\n self.intValue = nil\n }\n\n init?(intValue: Int) {\n self.stringValue = String(intValue)\n self.intValue = intValue\n }\n}\n\npublic enum ErrorCode: String, Codable, Sendable {\n${Object.values( diff --git a/src/cron/cron-protocol-schema.test.ts b/src/cron/cron-protocol-schema.test.ts index b0a9be03b93..76e6689f089 100644 --- a/src/cron/cron-protocol-schema.test.ts +++ b/src/cron/cron-protocol-schema.test.ts @@ -15,4 +15,11 @@ describe("cron protocol schema", () => { } expect(lastStatus.deprecated).toBe(true); }); + + it("exposes failure-notification delivery state", () => { + const properties = (CronJobStateSchema as SchemaLike).properties ?? {}; + expect(properties.lastFailureNotificationDelivered).toBeDefined(); + expect(properties.lastFailureNotificationDeliveryStatus).toBeDefined(); + expect(properties.lastFailureNotificationDeliveryError).toBeDefined(); + }); }); diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index 300cdeb922d..9db3a814da2 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -242,6 +242,10 @@ describe("cron run log", () => { delivered: true, deliveryStatus: "not-delivered", deliveryError: "announce failed", + failureNotificationDelivery: { + delivered: true, + status: "delivered", + }, delivery: { intended: { channel: "last", to: null, source: "last" }, resolved: { ok: true, channel: "telegram", to: "-100", source: "last" }, @@ -260,6 +264,10 @@ describe("cron run log", () => { expect(entries[0]?.delivered).toBe(true); expect(entries[0]?.deliveryStatus).toBe("not-delivered"); expect(entries[0]?.deliveryError).toBe("announce failed"); + expect(entries[0]?.failureNotificationDelivery).toEqual({ + delivered: true, + status: "delivered", + }); expect(entries[0]?.delivery).toEqual({ intended: { channel: "last", to: null, source: "last" }, resolved: { ok: true, channel: "telegram", to: "-100", source: "last" }, diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 5f7f937142e..270c9cbecfd 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -14,6 +14,7 @@ import { normalizeCronRunDiagnostics } from "./run-diagnostics.js"; import type { CronDeliveryStatus, CronDeliveryTrace, + CronFailureNotificationDelivery, CronRunDiagnostics, CronRunStatus, CronRunTelemetry, @@ -30,6 +31,7 @@ export type CronRunLogEntry = { delivered?: boolean; deliveryStatus?: CronDeliveryStatus; deliveryError?: string; + failureNotificationDelivery?: CronFailureNotificationDelivery; delivery?: CronDeliveryTrace; sessionId?: string; sessionKey?: string; @@ -350,6 +352,29 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL if (typeof obj.deliveryError === "string") { entry.deliveryError = obj.deliveryError; } + if (obj.failureNotificationDelivery && typeof obj.failureNotificationDelivery === "object") { + const failureNotificationDelivery = obj.failureNotificationDelivery as { + delivered?: unknown; + status?: unknown; + error?: unknown; + }; + if ( + failureNotificationDelivery.status === "delivered" || + failureNotificationDelivery.status === "not-delivered" || + failureNotificationDelivery.status === "unknown" || + failureNotificationDelivery.status === "not-requested" + ) { + entry.failureNotificationDelivery = { + status: failureNotificationDelivery.status, + ...(typeof failureNotificationDelivery.delivered === "boolean" + ? { delivered: failureNotificationDelivery.delivered } + : {}), + ...(typeof failureNotificationDelivery.error === "string" + ? { error: failureNotificationDelivery.error } + : {}), + }; + } + } if (obj.delivery && typeof obj.delivery === "object") { entry.delivery = obj.delivery; } diff --git a/src/cron/service.persists-delivered-status.test.ts b/src/cron/service.persists-delivered-status.test.ts index 68d39fdeca0..89c4645ad8d 100644 --- a/src/cron/service.persists-delivered-status.test.ts +++ b/src/cron/service.persists-delivered-status.test.ts @@ -33,6 +33,48 @@ function buildAnnounceIsolatedAgentTurnJob(name: string): CronAddInput { }; } +function buildAnnounceWithFailureDestinationJob(name: string): CronAddInput { + return { + ...buildAnnounceIsolatedAgentTurnJob(name), + delivery: { + mode: "announce", + channel: "forum", + to: "123", + failureDestination: { + mode: "webhook", + to: "https://example.invalid/cron-failure", + }, + }, + }; +} + +function buildFailureDestinationOnlyJob(name: string): CronAddInput { + return { + ...buildIsolatedAgentTurnJob(name), + delivery: { + mode: "none", + failureDestination: { + mode: "webhook", + to: "https://example.invalid/cron-failure", + }, + }, + }; +} + +function buildBestEffortFailureDestinationOnlyJob(name: string): CronAddInput { + return { + ...buildFailureDestinationOnlyJob(name), + delivery: { + mode: "none", + bestEffort: true, + failureDestination: { + mode: "webhook", + to: "https://example.invalid/cron-failure", + }, + }, + }; +} + function buildMainSessionSystemEventJob(name: string): CronAddInput { return { name, @@ -46,9 +88,19 @@ function buildMainSessionSystemEventJob(name: string): CronAddInput { function createIsolatedCronWithFinishedBarrier(params: { storePath: string; + status?: "ok" | "error"; delivered?: boolean; error?: string; - onFinished?: (evt: { jobId: string; delivered?: boolean; deliveryStatus?: string }) => void; + onFinished?: (evt: { + jobId: string; + delivered?: boolean; + deliveryStatus?: string; + failureNotificationDelivery?: { + delivered?: boolean; + status: string; + error?: string; + }; + }) => void; }) { const finished = createFinishedBarrier(); const cron = new CronService({ @@ -58,7 +110,7 @@ function createIsolatedCronWithFinishedBarrier(params: { enqueueSystemEvent: vi.fn(), requestHeartbeat: vi.fn(), runIsolatedAgentJob: vi.fn(async () => ({ - status: "ok" as const, + status: params.status ?? ("ok" as const), summary: "done", ...(params.error === undefined ? {} : { error: params.error }), ...(params.delivered === undefined ? {} : { delivered: params.delivered }), @@ -69,6 +121,7 @@ function createIsolatedCronWithFinishedBarrier(params: { jobId: evt.jobId, delivered: evt.delivered, deliveryStatus: evt.deliveryStatus, + failureNotificationDelivery: evt.failureNotificationDelivery, }); } finished.onEvent(evt); @@ -81,11 +134,13 @@ async function runSingleJobAndReadState(params: { cron: CronService; finished: ReturnType; job: CronAddInput; + waitForFinished?: (jobId: string) => Promise; }) { const job = await params.cron.add(params.job); + const finishedPromise = params.waitForFinished?.(job.id) ?? params.finished.waitForOk(job.id); vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); await vi.runOnlyPendingTimersAsync(); - await params.finished.waitForOk(job.id); + await finishedPromise; const jobs = await params.cron.list({ includeDisabled: true }); return { job, updated: jobs.find((entry) => entry.id === job.id) }; @@ -113,6 +168,9 @@ function expectDeliveryNotRequested( lastDelivered?: boolean; lastDeliveryStatus?: string; lastDeliveryError?: string; + lastFailureNotificationDelivered?: boolean; + lastFailureNotificationDeliveryStatus?: string; + lastFailureNotificationDeliveryError?: string; }; } | undefined, @@ -121,20 +179,38 @@ function expectDeliveryNotRequested( expect(updated?.state.lastDelivered).toBeUndefined(); expect(updated?.state.lastDeliveryStatus).toBe("not-requested"); expect(updated?.state.lastDeliveryError).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested"); + expect(updated?.state.lastFailureNotificationDeliveryError).toBeUndefined(); } async function runIsolatedJobAndReadState(params: { job: CronAddInput; + status?: "ok" | "error"; delivered?: boolean; error?: string; - onFinished?: (evt: { jobId: string; delivered?: boolean; deliveryStatus?: string }) => void; + onFinished?: (evt: { + jobId: string; + delivered?: boolean; + deliveryStatus?: string; + failureNotificationDelivery?: { + delivered?: boolean; + status: string; + error?: string; + }; + }) => void; }) { const store = await makeStorePath(); + const finishedEvents = new Map void>(); const { cron, finished } = createIsolatedCronWithFinishedBarrier({ storePath: store.storePath, + ...(params.status !== undefined ? { status: params.status } : {}), ...(params.delivered !== undefined ? { delivered: params.delivered } : {}), ...(params.error !== undefined ? { error: params.error } : {}), - ...(params.onFinished ? { onFinished: params.onFinished } : {}), + onFinished: (evt) => { + params.onFinished?.(evt); + finishedEvents.get(evt.jobId)?.(evt); + }, }); await cron.start(); @@ -143,6 +219,10 @@ async function runIsolatedJobAndReadState(params: { cron, finished, job: params.job, + waitForFinished: (jobId) => + new Promise((resolve) => { + finishedEvents.set(jobId, resolve); + }), }); return updated; } finally { @@ -160,6 +240,8 @@ describe("CronService persists delivered status", () => { expect(updated?.state.lastDelivered).toBe(true); expect(updated?.state.lastDeliveryStatus).toBe("delivered"); expect(updated?.state.lastDeliveryError).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested"); }); it("persists lastDelivered=false when isolated job explicitly reports not delivered", async () => { @@ -171,6 +253,136 @@ describe("CronService persists delivered status", () => { expect(updated?.state.lastDelivered).toBe(false); expect(updated?.state.lastDeliveryStatus).toBe("not-delivered"); expect(updated?.state.lastDeliveryError).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested"); + }); + + it("keeps failure notification delivery separate from successful result delivery", async () => { + let capturedEvent: + | { + delivered?: boolean; + deliveryStatus?: string; + failureNotificationDelivery?: { + delivered?: boolean; + status: string; + error?: string; + }; + } + | undefined; + const updated = await runIsolatedJobAndReadState({ + job: buildAnnounceIsolatedAgentTurnJob("error-notification-delivered"), + status: "error", + delivered: true, + error: "Agent couldn't generate a response.", + onFinished: (evt) => { + capturedEvent = evt; + }, + }); + + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastDelivered).toBe(false); + expect(updated?.state.lastDeliveryStatus).toBe("not-delivered"); + expect(updated?.state.lastDeliveryError).toBe("Agent couldn't generate a response."); + expect(updated?.state.lastFailureNotificationDelivered).toBe(true); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("delivered"); + expect(updated?.state.lastFailureNotificationDeliveryError).toBeUndefined(); + expect(capturedEvent?.delivered).toBe(false); + expect(capturedEvent?.deliveryStatus).toBe("not-delivered"); + expect(capturedEvent?.failureNotificationDelivery).toEqual({ + delivered: true, + status: "delivered", + }); + }); + + it("marks failure-destination-only error notification delivery unknown", async () => { + let capturedEvent: + | { + delivered?: boolean; + deliveryStatus?: string; + failureNotificationDelivery?: { + delivered?: boolean; + status: string; + error?: string; + }; + } + | undefined; + const updated = await runIsolatedJobAndReadState({ + job: buildFailureDestinationOnlyJob("failure-destination-only"), + status: "error", + error: "Agent couldn't generate a response.", + onFinished: (evt) => { + capturedEvent = evt; + }, + }); + + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastDelivered).toBeUndefined(); + expect(updated?.state.lastDeliveryStatus).toBe("not-requested"); + expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("unknown"); + expect(capturedEvent?.delivered).toBeUndefined(); + expect(capturedEvent?.deliveryStatus).toBe("not-requested"); + expect(capturedEvent?.failureNotificationDelivery).toEqual({ status: "unknown" }); + }); + + it("does not treat primary error delivery as alternate failure-destination delivery", async () => { + let capturedEvent: + | { + delivered?: boolean; + deliveryStatus?: string; + failureNotificationDelivery?: { + delivered?: boolean; + status: string; + error?: string; + }; + } + | undefined; + const updated = await runIsolatedJobAndReadState({ + job: buildAnnounceWithFailureDestinationJob("announce-plus-failure-destination"), + status: "error", + delivered: true, + error: "Agent couldn't generate a response.", + onFinished: (evt) => { + capturedEvent = evt; + }, + }); + + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastDelivered).toBe(false); + expect(updated?.state.lastDeliveryStatus).toBe("not-delivered"); + expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("unknown"); + expect(capturedEvent?.delivered).toBe(false); + expect(capturedEvent?.failureNotificationDelivery).toEqual({ status: "unknown" }); + }); + + it("keeps best-effort failure destinations suppressed", async () => { + let capturedEvent: + | { + delivered?: boolean; + deliveryStatus?: string; + failureNotificationDelivery?: { + delivered?: boolean; + status: string; + error?: string; + }; + } + | undefined; + const updated = await runIsolatedJobAndReadState({ + job: buildBestEffortFailureDestinationOnlyJob("best-effort-failure-destination-only"), + status: "error", + error: "Agent couldn't generate a response.", + onFinished: (evt) => { + capturedEvent = evt; + }, + }); + + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastDeliveryStatus).toBe("not-requested"); + expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested"); + expect(capturedEvent?.deliveryStatus).toBe("not-requested"); + expect(capturedEvent?.failureNotificationDelivery).toBeUndefined(); }); it("suppresses delivered=false when delivery.mode none opts out of delivery", async () => { diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index ca1b3c08fb0..32326d66dbc 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -72,6 +72,8 @@ function createInterruptedMainJob(now: number): CronJob { state: { nextRunAtMs: now - 60_000, runningAtMs: now - 30 * 60_000, + lastFailureNotificationDelivered: true, + lastFailureNotificationDeliveryStatus: "delivered", }, }; } @@ -198,6 +200,9 @@ describe("cron service ops seam coverage", () => { expect(job.state.lastRunStatus).toBe("error"); expect(job.state.lastRunAtMs).toBe(now - 30 * 60_000); expect(job.state.lastError).toBe("cron: job interrupted by gateway restart"); + expect(job.state.lastFailureNotificationDelivered).toBeUndefined(); + expect(job.state.lastFailureNotificationDeliveryStatus).toBe("not-requested"); + expect(job.state.lastFailureNotificationDeliveryError).toBeUndefined(); expect((job.state.nextRunAtMs ?? 0) > now).toBe(true); const delays = timeoutSpy.mock.calls diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 9394ddc9c09..986067e2825 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -8,6 +8,7 @@ import { failTaskRunByRunId, } from "../../tasks/detached-task-runtime.js"; import { clearCronJobActive, markCronJobActive } from "../active-jobs.js"; +import { resolveCronDeliveryPlan, resolveFailureDestination } from "../delivery-plan.js"; import { createCronRunDiagnosticsFromError } from "../run-diagnostics.js"; import { createCronExecutionId } from "../run-id.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; @@ -40,6 +41,7 @@ import { armTimer, emit, executeJobCoreWithTimeout, + failureNotificationDeliveryFromJobState, normalizeCronRunErrorText, runMissedJobs, stopTimer, @@ -54,6 +56,20 @@ type InterruptedStartupRun = { durationMs: number; }; +function resolveInterruptedStartupFailureNotificationStatus(params: { + state: CronServiceState; + job: CronJob; +}) { + if (params.job.delivery?.bestEffort === true) { + return "not-requested"; + } + if (resolveFailureDestination(params.job, params.state.deps.cronConfig?.failureDestination)) { + return "unknown"; + } + const primaryPlan = resolveCronDeliveryPlan(params.job); + return primaryPlan.mode === "announce" && primaryPlan.requested ? "unknown" : "not-requested"; +} + function markInterruptedStartupRun(params: { state: CronServiceState; job: CronJob; @@ -61,6 +77,10 @@ function markInterruptedStartupRun(params: { nowMs: number; }): InterruptedStartupRun { const { job, runningAtMs, nowMs } = params; + const failureNotificationStatus = resolveInterruptedStartupFailureNotificationStatus({ + state: params.state, + job, + }); const previousErrors = typeof job.state.consecutiveErrors === "number" && Number.isFinite(job.state.consecutiveErrors) ? Math.max(0, Math.floor(job.state.consecutiveErrors)) @@ -81,6 +101,9 @@ function markInterruptedStartupRun(params: { job.state.lastDelivered = false; job.state.lastDeliveryStatus = "unknown"; job.state.lastDeliveryError = STARTUP_INTERRUPTED_ERROR; + job.state.lastFailureNotificationDelivered = undefined; + job.state.lastFailureNotificationDeliveryStatus = failureNotificationStatus; + job.state.lastFailureNotificationDeliveryError = undefined; job.state.nextRunAtMs = undefined; job.updatedAtMs = nowMs; @@ -194,6 +217,7 @@ export async function start(state: CronServiceState) { delivered: false, deliveryStatus: "unknown", deliveryError: STARTUP_INTERRUPTED_ERROR, + failureNotificationDelivery: job ? failureNotificationDeliveryFromJobState(job) : undefined, runAtMs: interrupted.runAtMs, durationMs: interrupted.durationMs, nextRunAtMs: job?.state.nextRunAtMs, @@ -524,6 +548,7 @@ async function skipInvalidPersistedManualRun(params: { nextRunAtMs: params.job.state.nextRunAtMs, deliveryStatus: params.job.state.lastDeliveryStatus, deliveryError: params.job.state.lastDeliveryError, + failureNotificationDelivery: failureNotificationDeliveryFromJobState(params.job), }); if (shouldDelete && params.state.store) { @@ -762,9 +787,10 @@ async function finishPreparedManualRun( error: coreResult.error, summary: coreResult.summary, diagnostics: coreResult.diagnostics, - delivered: coreResult.delivered, + delivered: job.state.lastDelivered, deliveryStatus: job.state.lastDeliveryStatus, deliveryError: job.state.lastDeliveryError, + failureNotificationDelivery: failureNotificationDeliveryFromJobState(job), delivery: coreResult.delivery, sessionId: coreResult.sessionId, sessionKey: coreResult.sessionKey, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index ee8d90876c9..5abf14cab3b 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -3,6 +3,7 @@ import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heart import type { CronAgentExecutionPhaseUpdate, CronAgentExecutionStarted, + CronFailureNotificationDelivery, CronDeliveryStatus, CronDeliveryTrace, CronJob, @@ -30,6 +31,7 @@ export type CronEvent = { delivered?: boolean; deliveryStatus?: CronDeliveryStatus; deliveryError?: string; + failureNotificationDelivery?: CronFailureNotificationDelivery; delivery?: CronDeliveryTrace; sessionId?: string; sessionKey?: string; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index cdbba4e39d7..4ee9a7fc9d3 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -14,7 +14,7 @@ import { failTaskRunByRunId, } from "../../tasks/detached-task-runtime.js"; import { clearCronJobActive, markCronJobActive } from "../active-jobs.js"; -import { resolveCronDeliveryPlan } from "../delivery-plan.js"; +import { resolveCronDeliveryPlan, resolveFailureDestination } from "../delivery-plan.js"; import { createCronRunDiagnosticsFromError, normalizeCronRunDiagnostics, @@ -28,6 +28,7 @@ import type { CronAgentExecutionStarted, CronDeliveryStatus, CronDeliveryTrace, + CronFailureNotificationDelivery, CronJob, CronMessageChannel, CronRunOutcome, @@ -539,20 +540,94 @@ function resolveRetryConfig(cronConfig?: CronConfig) { }; } -function resolveDeliveryState(params: { job: CronJob; delivered?: boolean }): { +function resolveDeliveryState(params: { + job: CronJob; + runStatus: CronRunStatus; + delivered?: boolean; + error?: string; + globalFailureDestination?: CronConfig["failureDestination"]; +}): { delivered?: boolean; status: CronDeliveryStatus; + error?: string; + failureNotification: CronFailureNotificationDelivery; } { - if (!resolveCronDeliveryPlan(params.job).requested) { - return { status: "not-requested" }; + const primaryDeliveryRequested = resolveCronDeliveryPlan(params.job).requested; + const alternateFailureNotificationRequested = + params.runStatus === "error" && + params.job.delivery?.bestEffort !== true && + resolveFailureDestination(params.job, params.globalFailureDestination) !== null; + if (!primaryDeliveryRequested) { + return { + status: "not-requested", + failureNotification: { + status: alternateFailureNotificationRequested ? "unknown" : "not-requested", + }, + }; + } + if (params.runStatus === "error") { + const failureNotification: CronFailureNotificationDelivery = + alternateFailureNotificationRequested ? { status: "unknown" } : { status: "delivered" }; + if (params.delivered === true) { + return { + delivered: false, + status: "not-delivered", + error: params.error, + failureNotification: alternateFailureNotificationRequested + ? failureNotification + : { delivered: true, status: "delivered" }, + }; + } + if (params.delivered === false) { + return { + delivered: false, + status: "not-delivered", + error: params.error, + failureNotification: alternateFailureNotificationRequested + ? failureNotification + : { + delivered: false, + status: "not-delivered", + ...(params.error ? { error: params.error } : {}), + }, + }; + } + return { + status: "unknown", + error: params.error, + failureNotification: { status: "unknown" }, + }; } if (params.delivered === true) { - return { delivered: true, status: "delivered" }; + return { + delivered: true, + status: "delivered", + failureNotification: { status: "not-requested" }, + }; } if (params.delivered === false) { - return { delivered: false, status: "not-delivered" }; + return { + delivered: false, + status: "not-delivered", + error: params.error, + failureNotification: { status: "not-requested" }, + }; } - return { status: "unknown" }; + return { status: "unknown", failureNotification: { status: "not-requested" } }; +} + +export function failureNotificationDeliveryFromJobState( + job: CronJob, +): CronFailureNotificationDelivery | undefined { + const status = job.state.lastFailureNotificationDeliveryStatus; + if (!status || status === "not-requested") { + return undefined; + } + return { + delivered: job.state.lastFailureNotificationDelivered, + status, + error: job.state.lastFailureNotificationDeliveryError, + }; } function normalizeCronMessageChannel(input: unknown): CronMessageChannel | undefined { @@ -757,11 +832,22 @@ export function applyJobResult( "cron: job run returned error status", ); } - const deliveryState = resolveDeliveryState({ job, delivered: result.delivered }); + const deliveryState = resolveDeliveryState({ + job, + runStatus: result.status, + delivered: result.delivered, + error: result.error, + globalFailureDestination: state.deps.cronConfig?.failureDestination, + }); job.state.lastDelivered = deliveryState.delivered; job.state.lastDeliveryStatus = deliveryState.status; job.state.lastDeliveryError = - deliveryState.status === "not-delivered" && result.error ? result.error : undefined; + deliveryState.status === "not-delivered" && deliveryState.error + ? deliveryState.error + : undefined; + job.state.lastFailureNotificationDelivered = deliveryState.failureNotification.delivered; + job.state.lastFailureNotificationDeliveryStatus = deliveryState.failureNotification.status; + job.state.lastFailureNotificationDeliveryError = deliveryState.failureNotification.error; job.updatedAtMs = result.endedAt; // Track consecutive errors for backoff / auto-disable; skipped runs use a @@ -1825,9 +1911,10 @@ function emitJobFinished( error: result.error, summary: result.summary, diagnostics: result.diagnostics, - delivered: result.delivered, + delivered: job.state.lastDelivered, deliveryStatus: job.state.lastDeliveryStatus, deliveryError: job.state.lastDeliveryError, + failureNotificationDelivery: failureNotificationDeliveryFromJobState(job), delivery: result.delivery, sessionId: result.sessionId, sessionKey: result.sessionKey, diff --git a/src/cron/types.ts b/src/cron/types.ts index 4227456654e..21dffb4b2b3 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -70,6 +70,13 @@ export type CronDeliveryTrace = { delivered?: boolean; }; +export type CronFailureNotificationDelivery = { + /** Whether the last failed run's failure notification reached the target channel. */ + delivered?: boolean; + status: CronDeliveryStatus; + error?: string; +}; + export type CronDeliveryPreview = { label: string; detail: string; @@ -220,6 +227,12 @@ export type CronJobState = { lastDeliveryError?: string; /** Whether the last run's output was delivered to the target channel. */ lastDelivered?: boolean; + /** Whether the last failed run's failure notification was delivered to the target channel. */ + lastFailureNotificationDelivered?: boolean; + /** Delivery outcome for the last failed run's failure notification. */ + lastFailureNotificationDeliveryStatus?: CronDeliveryStatus; + /** Delivery-specific error for the last failed run's failure notification. */ + lastFailureNotificationDeliveryError?: string; }; export type CronJob = CronJobBase< diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index deff5cc5b84..5ff6810e54e 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -271,6 +271,15 @@ export const CronDeliveryPatchSchema = Type.Object( { additionalProperties: false }, ); +const CronFailureNotificationDeliverySchema = Type.Object( + { + delivered: Type.Optional(Type.Boolean()), + status: CronDeliveryStatusSchema, + error: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + export const CronJobStateSchema = Type.Object( { nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), @@ -288,6 +297,9 @@ export const CronJobStateSchema = Type.Object( lastDelivered: Type.Optional(Type.Boolean()), lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema), lastDeliveryError: Type.Optional(Type.String()), + lastFailureNotificationDelivered: Type.Optional(Type.Boolean()), + lastFailureNotificationDeliveryStatus: Type.Optional(CronDeliveryStatusSchema), + lastFailureNotificationDeliveryError: Type.Optional(Type.String()), lastFailureAlertAtMs: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, @@ -308,6 +320,9 @@ const CronJobStatePatchSchema = Type.Object( lastDelivered: Type.Optional(Type.Boolean()), lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema), lastDeliveryError: Type.Optional(Type.String()), + lastFailureNotificationDelivered: Type.Optional(Type.Boolean()), + lastFailureNotificationDeliveryStatus: Type.Optional(CronDeliveryStatusSchema), + lastFailureNotificationDeliveryError: Type.Optional(Type.String()), lastFailureAlertAtMs: Type.Optional(Type.Integer({ minimum: 0 })), }, { additionalProperties: false }, @@ -423,6 +438,7 @@ export const CronRunLogEntrySchema = Type.Object( delivered: Type.Optional(Type.Boolean()), deliveryStatus: Type.Optional(CronDeliveryStatusSchema), deliveryError: Type.Optional(Type.String()), + failureNotificationDelivery: Type.Optional(CronFailureNotificationDeliverySchema), sessionId: Type.Optional(NonEmptyString), sessionKey: Type.Optional(NonEmptyString), runId: Type.Optional(NonEmptyString), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index eb39151397f..2ee4cb656b8 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -100,6 +100,12 @@ function toPluginCronJob(job: CronJob): PluginHookGatewayCronJob { lastRunStatus: job.state.lastRunStatus, lastError: job.state.lastError, lastDurationMs: job.state.lastDurationMs, + lastDelivered: job.state.lastDelivered, + lastDeliveryStatus: job.state.lastDeliveryStatus, + lastDeliveryError: job.state.lastDeliveryError, + lastFailureNotificationDelivered: job.state.lastFailureNotificationDelivered, + lastFailureNotificationDeliveryStatus: job.state.lastFailureNotificationDeliveryStatus, + lastFailureNotificationDeliveryError: job.state.lastFailureNotificationDeliveryError, }, createdAtMs: job.createdAtMs, updatedAtMs: job.updatedAtMs, @@ -464,6 +470,7 @@ export function buildGatewayCronService(params: { delivered: evt.delivered, deliveryStatus: evt.deliveryStatus, deliveryError: evt.deliveryError, + failureNotificationDelivery: evt.failureNotificationDelivery, delivery: evt.delivery, sessionId: evt.sessionId, sessionKey: evt.sessionKey, diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index 58aec3e3080..41ef3ef87cc 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -666,6 +666,12 @@ export type PluginHookGatewayCronJobState = { lastRunStatus?: PluginHookGatewayCronRunStatus; lastError?: string; lastDurationMs?: number; + lastDelivered?: boolean; + lastDeliveryStatus?: PluginHookGatewayCronDeliveryStatus; + lastDeliveryError?: string; + lastFailureNotificationDelivered?: boolean; + lastFailureNotificationDeliveryStatus?: PluginHookGatewayCronDeliveryStatus; + lastFailureNotificationDeliveryError?: string; }; export type PluginHookGatewayCronJob = { diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index 55ffdf1885a..6142b73c085 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -594,6 +594,9 @@ export type CronJobState = { lastDelivered?: boolean; lastDeliveryStatus?: CronDeliveryStatus; lastDeliveryError?: string; + lastFailureNotificationDelivered?: boolean; + lastFailureNotificationDeliveryStatus?: CronDeliveryStatus; + lastFailureNotificationDeliveryError?: string; lastFailureAlertAtMs?: number; };