mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 13:04:47 +00:00
fix(cron): separate failure notification delivery
This commit is contained in:
@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugin releases: require external package compatibility metadata in the npm plugin publish plan, matching the ClawHub package contract before packages ship.
|
||||
- Agents/OpenAI-compatible: honor per-model `max_completion_tokens`/`max_tokens` params in embedded OpenAI-completions runs so high-token Kimi-style routes keep their configured completion cap. Fixes #82230. Thanks @albert-zen.
|
||||
- Agents/local: install a local gateway request scope around trusted `openclaw agent --local` runs, so subagent completion announces can use in-process gateway dispatch without crashing. Fixes #82140. Thanks @Kushmaro.
|
||||
- Cron: keep failed isolated-agent runs from marking successful result delivery when only the failure notification was delivered. Fixes #72985. Thanks @Allenbluff.
|
||||
- Discord: validate message-read results before normalizing channel history and report unexpected payloads with a Discord boundary error instead of `map is not a function`. Fixes #82252. Thanks @jessewunderlich.
|
||||
- Agents/runtime: apply `agents.defaults.models["provider/*"].agentRuntime` as provider-wide model runtime policy while preserving exact model runtime precedence. Fixes #82243. Thanks @rendrag-git.
|
||||
- Agents/auto-reply: restrict `NO_REPLY` prompt guidance to automatic group/channel replies, remove legacy silent-reply rewrites, and suppress accidental direct-chat silent tokens instead of delivering fallback text. Fixes #82254. Thanks @absol89.
|
||||
|
||||
@@ -5293,6 +5293,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
public let delivered: Bool?
|
||||
public let deliverystatus: AnyCodable?
|
||||
public let deliveryerror: String?
|
||||
public let failurenotificationdelivery: [String: AnyCodable]?
|
||||
public let sessionid: String?
|
||||
public let sessionkey: String?
|
||||
public let runid: String?
|
||||
@@ -5315,6 +5316,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
delivered: Bool?,
|
||||
deliverystatus: AnyCodable?,
|
||||
deliveryerror: String?,
|
||||
failurenotificationdelivery: [String: AnyCodable]? = nil,
|
||||
sessionid: String?,
|
||||
sessionkey: String?,
|
||||
runid: String?,
|
||||
@@ -5336,6 +5338,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
self.delivered = delivered
|
||||
self.deliverystatus = deliverystatus
|
||||
self.deliveryerror = deliveryerror
|
||||
self.failurenotificationdelivery = failurenotificationdelivery
|
||||
self.sessionid = sessionid
|
||||
self.sessionkey = sessionkey
|
||||
self.runid = runid
|
||||
@@ -5359,6 +5362,7 @@ public struct CronRunLogEntry: Codable, Sendable {
|
||||
case delivered
|
||||
case deliverystatus = "deliveryStatus"
|
||||
case deliveryerror = "deliveryError"
|
||||
case failurenotificationdelivery = "failureNotificationDelivery"
|
||||
case sessionid = "sessionId"
|
||||
case sessionkey = "sessionKey"
|
||||
case runid = "runId"
|
||||
|
||||
@@ -42,6 +42,7 @@ const STRICT_LITERAL_STRUCTS = new Set([
|
||||
|
||||
const DEFAULTED_OPTIONAL_INIT_PARAMS: Record<string, Set<string>> = {
|
||||
MessageActionParams: new Set(["inboundTurnKind"]),
|
||||
CronRunLogEntry: new Set(["failureNotificationDelivery"]),
|
||||
};
|
||||
|
||||
const header = `// Generated by scripts/protocol-gen-swift.ts — do not edit by hand\n// swiftlint:disable file_length\nimport Foundation\n\npublic let GATEWAY_PROTOCOL_VERSION = ${PROTOCOL_VERSION}\npublic let GATEWAY_MIN_PROTOCOL_VERSION = ${MIN_CLIENT_PROTOCOL_VERSION}\n\nprivate struct GatewayAnyCodingKey: CodingKey, Hashable {\n let stringValue: String\n let intValue: Int?\n\n init?(stringValue: String) {\n self.stringValue = stringValue\n self.intValue = nil\n }\n\n init?(intValue: Int) {\n self.stringValue = String(intValue)\n self.intValue = intValue\n }\n}\n\npublic enum ErrorCode: String, Codable, Sendable {\n${Object.values(
|
||||
|
||||
@@ -15,4 +15,11 @@ describe("cron protocol schema", () => {
|
||||
}
|
||||
expect(lastStatus.deprecated).toBe(true);
|
||||
});
|
||||
|
||||
it("exposes failure-notification delivery state", () => {
|
||||
const properties = (CronJobStateSchema as SchemaLike).properties ?? {};
|
||||
expect(properties.lastFailureNotificationDelivered).toBeDefined();
|
||||
expect(properties.lastFailureNotificationDeliveryStatus).toBeDefined();
|
||||
expect(properties.lastFailureNotificationDeliveryError).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -242,6 +242,10 @@ describe("cron run log", () => {
|
||||
delivered: true,
|
||||
deliveryStatus: "not-delivered",
|
||||
deliveryError: "announce failed",
|
||||
failureNotificationDelivery: {
|
||||
delivered: true,
|
||||
status: "delivered",
|
||||
},
|
||||
delivery: {
|
||||
intended: { channel: "last", to: null, source: "last" },
|
||||
resolved: { ok: true, channel: "telegram", to: "-100", source: "last" },
|
||||
@@ -260,6 +264,10 @@ describe("cron run log", () => {
|
||||
expect(entries[0]?.delivered).toBe(true);
|
||||
expect(entries[0]?.deliveryStatus).toBe("not-delivered");
|
||||
expect(entries[0]?.deliveryError).toBe("announce failed");
|
||||
expect(entries[0]?.failureNotificationDelivery).toEqual({
|
||||
delivered: true,
|
||||
status: "delivered",
|
||||
});
|
||||
expect(entries[0]?.delivery).toEqual({
|
||||
intended: { channel: "last", to: null, source: "last" },
|
||||
resolved: { ok: true, channel: "telegram", to: "-100", source: "last" },
|
||||
|
||||
@@ -14,6 +14,7 @@ import { normalizeCronRunDiagnostics } from "./run-diagnostics.js";
|
||||
import type {
|
||||
CronDeliveryStatus,
|
||||
CronDeliveryTrace,
|
||||
CronFailureNotificationDelivery,
|
||||
CronRunDiagnostics,
|
||||
CronRunStatus,
|
||||
CronRunTelemetry,
|
||||
@@ -30,6 +31,7 @@ export type CronRunLogEntry = {
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: CronDeliveryStatus;
|
||||
deliveryError?: string;
|
||||
failureNotificationDelivery?: CronFailureNotificationDelivery;
|
||||
delivery?: CronDeliveryTrace;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
@@ -350,6 +352,29 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL
|
||||
if (typeof obj.deliveryError === "string") {
|
||||
entry.deliveryError = obj.deliveryError;
|
||||
}
|
||||
if (obj.failureNotificationDelivery && typeof obj.failureNotificationDelivery === "object") {
|
||||
const failureNotificationDelivery = obj.failureNotificationDelivery as {
|
||||
delivered?: unknown;
|
||||
status?: unknown;
|
||||
error?: unknown;
|
||||
};
|
||||
if (
|
||||
failureNotificationDelivery.status === "delivered" ||
|
||||
failureNotificationDelivery.status === "not-delivered" ||
|
||||
failureNotificationDelivery.status === "unknown" ||
|
||||
failureNotificationDelivery.status === "not-requested"
|
||||
) {
|
||||
entry.failureNotificationDelivery = {
|
||||
status: failureNotificationDelivery.status,
|
||||
...(typeof failureNotificationDelivery.delivered === "boolean"
|
||||
? { delivered: failureNotificationDelivery.delivered }
|
||||
: {}),
|
||||
...(typeof failureNotificationDelivery.error === "string"
|
||||
? { error: failureNotificationDelivery.error }
|
||||
: {}),
|
||||
};
|
||||
}
|
||||
}
|
||||
if (obj.delivery && typeof obj.delivery === "object") {
|
||||
entry.delivery = obj.delivery;
|
||||
}
|
||||
|
||||
@@ -33,6 +33,48 @@ function buildAnnounceIsolatedAgentTurnJob(name: string): CronAddInput {
|
||||
};
|
||||
}
|
||||
|
||||
function buildAnnounceWithFailureDestinationJob(name: string): CronAddInput {
|
||||
return {
|
||||
...buildAnnounceIsolatedAgentTurnJob(name),
|
||||
delivery: {
|
||||
mode: "announce",
|
||||
channel: "forum",
|
||||
to: "123",
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-failure",
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function buildFailureDestinationOnlyJob(name: string): CronAddInput {
|
||||
return {
|
||||
...buildIsolatedAgentTurnJob(name),
|
||||
delivery: {
|
||||
mode: "none",
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-failure",
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function buildBestEffortFailureDestinationOnlyJob(name: string): CronAddInput {
|
||||
return {
|
||||
...buildFailureDestinationOnlyJob(name),
|
||||
delivery: {
|
||||
mode: "none",
|
||||
bestEffort: true,
|
||||
failureDestination: {
|
||||
mode: "webhook",
|
||||
to: "https://example.invalid/cron-failure",
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function buildMainSessionSystemEventJob(name: string): CronAddInput {
|
||||
return {
|
||||
name,
|
||||
@@ -46,9 +88,19 @@ function buildMainSessionSystemEventJob(name: string): CronAddInput {
|
||||
|
||||
function createIsolatedCronWithFinishedBarrier(params: {
|
||||
storePath: string;
|
||||
status?: "ok" | "error";
|
||||
delivered?: boolean;
|
||||
error?: string;
|
||||
onFinished?: (evt: { jobId: string; delivered?: boolean; deliveryStatus?: string }) => void;
|
||||
onFinished?: (evt: {
|
||||
jobId: string;
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: string;
|
||||
failureNotificationDelivery?: {
|
||||
delivered?: boolean;
|
||||
status: string;
|
||||
error?: string;
|
||||
};
|
||||
}) => void;
|
||||
}) {
|
||||
const finished = createFinishedBarrier();
|
||||
const cron = new CronService({
|
||||
@@ -58,7 +110,7 @@ function createIsolatedCronWithFinishedBarrier(params: {
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeat: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({
|
||||
status: "ok" as const,
|
||||
status: params.status ?? ("ok" as const),
|
||||
summary: "done",
|
||||
...(params.error === undefined ? {} : { error: params.error }),
|
||||
...(params.delivered === undefined ? {} : { delivered: params.delivered }),
|
||||
@@ -69,6 +121,7 @@ function createIsolatedCronWithFinishedBarrier(params: {
|
||||
jobId: evt.jobId,
|
||||
delivered: evt.delivered,
|
||||
deliveryStatus: evt.deliveryStatus,
|
||||
failureNotificationDelivery: evt.failureNotificationDelivery,
|
||||
});
|
||||
}
|
||||
finished.onEvent(evt);
|
||||
@@ -81,11 +134,13 @@ async function runSingleJobAndReadState(params: {
|
||||
cron: CronService;
|
||||
finished: ReturnType<typeof createFinishedBarrier>;
|
||||
job: CronAddInput;
|
||||
waitForFinished?: (jobId: string) => Promise<unknown>;
|
||||
}) {
|
||||
const job = await params.cron.add(params.job);
|
||||
const finishedPromise = params.waitForFinished?.(job.id) ?? params.finished.waitForOk(job.id);
|
||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await params.finished.waitForOk(job.id);
|
||||
await finishedPromise;
|
||||
|
||||
const jobs = await params.cron.list({ includeDisabled: true });
|
||||
return { job, updated: jobs.find((entry) => entry.id === job.id) };
|
||||
@@ -113,6 +168,9 @@ function expectDeliveryNotRequested(
|
||||
lastDelivered?: boolean;
|
||||
lastDeliveryStatus?: string;
|
||||
lastDeliveryError?: string;
|
||||
lastFailureNotificationDelivered?: boolean;
|
||||
lastFailureNotificationDeliveryStatus?: string;
|
||||
lastFailureNotificationDeliveryError?: string;
|
||||
};
|
||||
}
|
||||
| undefined,
|
||||
@@ -121,20 +179,38 @@ function expectDeliveryNotRequested(
|
||||
expect(updated?.state.lastDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("not-requested");
|
||||
expect(updated?.state.lastDeliveryError).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested");
|
||||
expect(updated?.state.lastFailureNotificationDeliveryError).toBeUndefined();
|
||||
}
|
||||
|
||||
async function runIsolatedJobAndReadState(params: {
|
||||
job: CronAddInput;
|
||||
status?: "ok" | "error";
|
||||
delivered?: boolean;
|
||||
error?: string;
|
||||
onFinished?: (evt: { jobId: string; delivered?: boolean; deliveryStatus?: string }) => void;
|
||||
onFinished?: (evt: {
|
||||
jobId: string;
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: string;
|
||||
failureNotificationDelivery?: {
|
||||
delivered?: boolean;
|
||||
status: string;
|
||||
error?: string;
|
||||
};
|
||||
}) => void;
|
||||
}) {
|
||||
const store = await makeStorePath();
|
||||
const finishedEvents = new Map<string, (evt: unknown) => void>();
|
||||
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
|
||||
storePath: store.storePath,
|
||||
...(params.status !== undefined ? { status: params.status } : {}),
|
||||
...(params.delivered !== undefined ? { delivered: params.delivered } : {}),
|
||||
...(params.error !== undefined ? { error: params.error } : {}),
|
||||
...(params.onFinished ? { onFinished: params.onFinished } : {}),
|
||||
onFinished: (evt) => {
|
||||
params.onFinished?.(evt);
|
||||
finishedEvents.get(evt.jobId)?.(evt);
|
||||
},
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
@@ -143,6 +219,10 @@ async function runIsolatedJobAndReadState(params: {
|
||||
cron,
|
||||
finished,
|
||||
job: params.job,
|
||||
waitForFinished: (jobId) =>
|
||||
new Promise((resolve) => {
|
||||
finishedEvents.set(jobId, resolve);
|
||||
}),
|
||||
});
|
||||
return updated;
|
||||
} finally {
|
||||
@@ -160,6 +240,8 @@ describe("CronService persists delivered status", () => {
|
||||
expect(updated?.state.lastDelivered).toBe(true);
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("delivered");
|
||||
expect(updated?.state.lastDeliveryError).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested");
|
||||
});
|
||||
|
||||
it("persists lastDelivered=false when isolated job explicitly reports not delivered", async () => {
|
||||
@@ -171,6 +253,136 @@ describe("CronService persists delivered status", () => {
|
||||
expect(updated?.state.lastDelivered).toBe(false);
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("not-delivered");
|
||||
expect(updated?.state.lastDeliveryError).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested");
|
||||
});
|
||||
|
||||
it("keeps failure notification delivery separate from successful result delivery", async () => {
|
||||
let capturedEvent:
|
||||
| {
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: string;
|
||||
failureNotificationDelivery?: {
|
||||
delivered?: boolean;
|
||||
status: string;
|
||||
error?: string;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
const updated = await runIsolatedJobAndReadState({
|
||||
job: buildAnnounceIsolatedAgentTurnJob("error-notification-delivered"),
|
||||
status: "error",
|
||||
delivered: true,
|
||||
error: "Agent couldn't generate a response.",
|
||||
onFinished: (evt) => {
|
||||
capturedEvent = evt;
|
||||
},
|
||||
});
|
||||
|
||||
expect(updated?.state.lastRunStatus).toBe("error");
|
||||
expect(updated?.state.lastDelivered).toBe(false);
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("not-delivered");
|
||||
expect(updated?.state.lastDeliveryError).toBe("Agent couldn't generate a response.");
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBe(true);
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("delivered");
|
||||
expect(updated?.state.lastFailureNotificationDeliveryError).toBeUndefined();
|
||||
expect(capturedEvent?.delivered).toBe(false);
|
||||
expect(capturedEvent?.deliveryStatus).toBe("not-delivered");
|
||||
expect(capturedEvent?.failureNotificationDelivery).toEqual({
|
||||
delivered: true,
|
||||
status: "delivered",
|
||||
});
|
||||
});
|
||||
|
||||
it("marks failure-destination-only error notification delivery unknown", async () => {
|
||||
let capturedEvent:
|
||||
| {
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: string;
|
||||
failureNotificationDelivery?: {
|
||||
delivered?: boolean;
|
||||
status: string;
|
||||
error?: string;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
const updated = await runIsolatedJobAndReadState({
|
||||
job: buildFailureDestinationOnlyJob("failure-destination-only"),
|
||||
status: "error",
|
||||
error: "Agent couldn't generate a response.",
|
||||
onFinished: (evt) => {
|
||||
capturedEvent = evt;
|
||||
},
|
||||
});
|
||||
|
||||
expect(updated?.state.lastRunStatus).toBe("error");
|
||||
expect(updated?.state.lastDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("not-requested");
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("unknown");
|
||||
expect(capturedEvent?.delivered).toBeUndefined();
|
||||
expect(capturedEvent?.deliveryStatus).toBe("not-requested");
|
||||
expect(capturedEvent?.failureNotificationDelivery).toEqual({ status: "unknown" });
|
||||
});
|
||||
|
||||
it("does not treat primary error delivery as alternate failure-destination delivery", async () => {
|
||||
let capturedEvent:
|
||||
| {
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: string;
|
||||
failureNotificationDelivery?: {
|
||||
delivered?: boolean;
|
||||
status: string;
|
||||
error?: string;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
const updated = await runIsolatedJobAndReadState({
|
||||
job: buildAnnounceWithFailureDestinationJob("announce-plus-failure-destination"),
|
||||
status: "error",
|
||||
delivered: true,
|
||||
error: "Agent couldn't generate a response.",
|
||||
onFinished: (evt) => {
|
||||
capturedEvent = evt;
|
||||
},
|
||||
});
|
||||
|
||||
expect(updated?.state.lastRunStatus).toBe("error");
|
||||
expect(updated?.state.lastDelivered).toBe(false);
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("not-delivered");
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("unknown");
|
||||
expect(capturedEvent?.delivered).toBe(false);
|
||||
expect(capturedEvent?.failureNotificationDelivery).toEqual({ status: "unknown" });
|
||||
});
|
||||
|
||||
it("keeps best-effort failure destinations suppressed", async () => {
|
||||
let capturedEvent:
|
||||
| {
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: string;
|
||||
failureNotificationDelivery?: {
|
||||
delivered?: boolean;
|
||||
status: string;
|
||||
error?: string;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
const updated = await runIsolatedJobAndReadState({
|
||||
job: buildBestEffortFailureDestinationOnlyJob("best-effort-failure-destination-only"),
|
||||
status: "error",
|
||||
error: "Agent couldn't generate a response.",
|
||||
onFinished: (evt) => {
|
||||
capturedEvent = evt;
|
||||
},
|
||||
});
|
||||
|
||||
expect(updated?.state.lastRunStatus).toBe("error");
|
||||
expect(updated?.state.lastDeliveryStatus).toBe("not-requested");
|
||||
expect(updated?.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(updated?.state.lastFailureNotificationDeliveryStatus).toBe("not-requested");
|
||||
expect(capturedEvent?.deliveryStatus).toBe("not-requested");
|
||||
expect(capturedEvent?.failureNotificationDelivery).toBeUndefined();
|
||||
});
|
||||
|
||||
it("suppresses delivered=false when delivery.mode none opts out of delivery", async () => {
|
||||
|
||||
@@ -72,6 +72,8 @@ function createInterruptedMainJob(now: number): CronJob {
|
||||
state: {
|
||||
nextRunAtMs: now - 60_000,
|
||||
runningAtMs: now - 30 * 60_000,
|
||||
lastFailureNotificationDelivered: true,
|
||||
lastFailureNotificationDeliveryStatus: "delivered",
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -198,6 +200,9 @@ describe("cron service ops seam coverage", () => {
|
||||
expect(job.state.lastRunStatus).toBe("error");
|
||||
expect(job.state.lastRunAtMs).toBe(now - 30 * 60_000);
|
||||
expect(job.state.lastError).toBe("cron: job interrupted by gateway restart");
|
||||
expect(job.state.lastFailureNotificationDelivered).toBeUndefined();
|
||||
expect(job.state.lastFailureNotificationDeliveryStatus).toBe("not-requested");
|
||||
expect(job.state.lastFailureNotificationDeliveryError).toBeUndefined();
|
||||
expect((job.state.nextRunAtMs ?? 0) > now).toBe(true);
|
||||
|
||||
const delays = timeoutSpy.mock.calls
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
failTaskRunByRunId,
|
||||
} from "../../tasks/detached-task-runtime.js";
|
||||
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
|
||||
import { resolveCronDeliveryPlan, resolveFailureDestination } from "../delivery-plan.js";
|
||||
import { createCronRunDiagnosticsFromError } from "../run-diagnostics.js";
|
||||
import { createCronExecutionId } from "../run-id.js";
|
||||
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
|
||||
@@ -40,6 +41,7 @@ import {
|
||||
armTimer,
|
||||
emit,
|
||||
executeJobCoreWithTimeout,
|
||||
failureNotificationDeliveryFromJobState,
|
||||
normalizeCronRunErrorText,
|
||||
runMissedJobs,
|
||||
stopTimer,
|
||||
@@ -54,6 +56,20 @@ type InterruptedStartupRun = {
|
||||
durationMs: number;
|
||||
};
|
||||
|
||||
function resolveInterruptedStartupFailureNotificationStatus(params: {
|
||||
state: CronServiceState;
|
||||
job: CronJob;
|
||||
}) {
|
||||
if (params.job.delivery?.bestEffort === true) {
|
||||
return "not-requested";
|
||||
}
|
||||
if (resolveFailureDestination(params.job, params.state.deps.cronConfig?.failureDestination)) {
|
||||
return "unknown";
|
||||
}
|
||||
const primaryPlan = resolveCronDeliveryPlan(params.job);
|
||||
return primaryPlan.mode === "announce" && primaryPlan.requested ? "unknown" : "not-requested";
|
||||
}
|
||||
|
||||
function markInterruptedStartupRun(params: {
|
||||
state: CronServiceState;
|
||||
job: CronJob;
|
||||
@@ -61,6 +77,10 @@ function markInterruptedStartupRun(params: {
|
||||
nowMs: number;
|
||||
}): InterruptedStartupRun {
|
||||
const { job, runningAtMs, nowMs } = params;
|
||||
const failureNotificationStatus = resolveInterruptedStartupFailureNotificationStatus({
|
||||
state: params.state,
|
||||
job,
|
||||
});
|
||||
const previousErrors =
|
||||
typeof job.state.consecutiveErrors === "number" && Number.isFinite(job.state.consecutiveErrors)
|
||||
? Math.max(0, Math.floor(job.state.consecutiveErrors))
|
||||
@@ -81,6 +101,9 @@ function markInterruptedStartupRun(params: {
|
||||
job.state.lastDelivered = false;
|
||||
job.state.lastDeliveryStatus = "unknown";
|
||||
job.state.lastDeliveryError = STARTUP_INTERRUPTED_ERROR;
|
||||
job.state.lastFailureNotificationDelivered = undefined;
|
||||
job.state.lastFailureNotificationDeliveryStatus = failureNotificationStatus;
|
||||
job.state.lastFailureNotificationDeliveryError = undefined;
|
||||
job.state.nextRunAtMs = undefined;
|
||||
job.updatedAtMs = nowMs;
|
||||
|
||||
@@ -194,6 +217,7 @@ export async function start(state: CronServiceState) {
|
||||
delivered: false,
|
||||
deliveryStatus: "unknown",
|
||||
deliveryError: STARTUP_INTERRUPTED_ERROR,
|
||||
failureNotificationDelivery: job ? failureNotificationDeliveryFromJobState(job) : undefined,
|
||||
runAtMs: interrupted.runAtMs,
|
||||
durationMs: interrupted.durationMs,
|
||||
nextRunAtMs: job?.state.nextRunAtMs,
|
||||
@@ -524,6 +548,7 @@ async function skipInvalidPersistedManualRun(params: {
|
||||
nextRunAtMs: params.job.state.nextRunAtMs,
|
||||
deliveryStatus: params.job.state.lastDeliveryStatus,
|
||||
deliveryError: params.job.state.lastDeliveryError,
|
||||
failureNotificationDelivery: failureNotificationDeliveryFromJobState(params.job),
|
||||
});
|
||||
|
||||
if (shouldDelete && params.state.store) {
|
||||
@@ -762,9 +787,10 @@ async function finishPreparedManualRun(
|
||||
error: coreResult.error,
|
||||
summary: coreResult.summary,
|
||||
diagnostics: coreResult.diagnostics,
|
||||
delivered: coreResult.delivered,
|
||||
delivered: job.state.lastDelivered,
|
||||
deliveryStatus: job.state.lastDeliveryStatus,
|
||||
deliveryError: job.state.lastDeliveryError,
|
||||
failureNotificationDelivery: failureNotificationDeliveryFromJobState(job),
|
||||
delivery: coreResult.delivery,
|
||||
sessionId: coreResult.sessionId,
|
||||
sessionKey: coreResult.sessionKey,
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heart
|
||||
import type {
|
||||
CronAgentExecutionPhaseUpdate,
|
||||
CronAgentExecutionStarted,
|
||||
CronFailureNotificationDelivery,
|
||||
CronDeliveryStatus,
|
||||
CronDeliveryTrace,
|
||||
CronJob,
|
||||
@@ -30,6 +31,7 @@ export type CronEvent = {
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: CronDeliveryStatus;
|
||||
deliveryError?: string;
|
||||
failureNotificationDelivery?: CronFailureNotificationDelivery;
|
||||
delivery?: CronDeliveryTrace;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
|
||||
@@ -14,7 +14,7 @@ import {
|
||||
failTaskRunByRunId,
|
||||
} from "../../tasks/detached-task-runtime.js";
|
||||
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
|
||||
import { resolveCronDeliveryPlan, resolveFailureDestination } from "../delivery-plan.js";
|
||||
import {
|
||||
createCronRunDiagnosticsFromError,
|
||||
normalizeCronRunDiagnostics,
|
||||
@@ -28,6 +28,7 @@ import type {
|
||||
CronAgentExecutionStarted,
|
||||
CronDeliveryStatus,
|
||||
CronDeliveryTrace,
|
||||
CronFailureNotificationDelivery,
|
||||
CronJob,
|
||||
CronMessageChannel,
|
||||
CronRunOutcome,
|
||||
@@ -539,20 +540,94 @@ function resolveRetryConfig(cronConfig?: CronConfig) {
|
||||
};
|
||||
}
|
||||
|
||||
function resolveDeliveryState(params: { job: CronJob; delivered?: boolean }): {
|
||||
function resolveDeliveryState(params: {
|
||||
job: CronJob;
|
||||
runStatus: CronRunStatus;
|
||||
delivered?: boolean;
|
||||
error?: string;
|
||||
globalFailureDestination?: CronConfig["failureDestination"];
|
||||
}): {
|
||||
delivered?: boolean;
|
||||
status: CronDeliveryStatus;
|
||||
error?: string;
|
||||
failureNotification: CronFailureNotificationDelivery;
|
||||
} {
|
||||
if (!resolveCronDeliveryPlan(params.job).requested) {
|
||||
return { status: "not-requested" };
|
||||
const primaryDeliveryRequested = resolveCronDeliveryPlan(params.job).requested;
|
||||
const alternateFailureNotificationRequested =
|
||||
params.runStatus === "error" &&
|
||||
params.job.delivery?.bestEffort !== true &&
|
||||
resolveFailureDestination(params.job, params.globalFailureDestination) !== null;
|
||||
if (!primaryDeliveryRequested) {
|
||||
return {
|
||||
status: "not-requested",
|
||||
failureNotification: {
|
||||
status: alternateFailureNotificationRequested ? "unknown" : "not-requested",
|
||||
},
|
||||
};
|
||||
}
|
||||
if (params.runStatus === "error") {
|
||||
const failureNotification: CronFailureNotificationDelivery =
|
||||
alternateFailureNotificationRequested ? { status: "unknown" } : { status: "delivered" };
|
||||
if (params.delivered === true) {
|
||||
return {
|
||||
delivered: false,
|
||||
status: "not-delivered",
|
||||
error: params.error,
|
||||
failureNotification: alternateFailureNotificationRequested
|
||||
? failureNotification
|
||||
: { delivered: true, status: "delivered" },
|
||||
};
|
||||
}
|
||||
if (params.delivered === false) {
|
||||
return {
|
||||
delivered: false,
|
||||
status: "not-delivered",
|
||||
error: params.error,
|
||||
failureNotification: alternateFailureNotificationRequested
|
||||
? failureNotification
|
||||
: {
|
||||
delivered: false,
|
||||
status: "not-delivered",
|
||||
...(params.error ? { error: params.error } : {}),
|
||||
},
|
||||
};
|
||||
}
|
||||
return {
|
||||
status: "unknown",
|
||||
error: params.error,
|
||||
failureNotification: { status: "unknown" },
|
||||
};
|
||||
}
|
||||
if (params.delivered === true) {
|
||||
return { delivered: true, status: "delivered" };
|
||||
return {
|
||||
delivered: true,
|
||||
status: "delivered",
|
||||
failureNotification: { status: "not-requested" },
|
||||
};
|
||||
}
|
||||
if (params.delivered === false) {
|
||||
return { delivered: false, status: "not-delivered" };
|
||||
return {
|
||||
delivered: false,
|
||||
status: "not-delivered",
|
||||
error: params.error,
|
||||
failureNotification: { status: "not-requested" },
|
||||
};
|
||||
}
|
||||
return { status: "unknown" };
|
||||
return { status: "unknown", failureNotification: { status: "not-requested" } };
|
||||
}
|
||||
|
||||
export function failureNotificationDeliveryFromJobState(
|
||||
job: CronJob,
|
||||
): CronFailureNotificationDelivery | undefined {
|
||||
const status = job.state.lastFailureNotificationDeliveryStatus;
|
||||
if (!status || status === "not-requested") {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
delivered: job.state.lastFailureNotificationDelivered,
|
||||
status,
|
||||
error: job.state.lastFailureNotificationDeliveryError,
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeCronMessageChannel(input: unknown): CronMessageChannel | undefined {
|
||||
@@ -757,11 +832,22 @@ export function applyJobResult(
|
||||
"cron: job run returned error status",
|
||||
);
|
||||
}
|
||||
const deliveryState = resolveDeliveryState({ job, delivered: result.delivered });
|
||||
const deliveryState = resolveDeliveryState({
|
||||
job,
|
||||
runStatus: result.status,
|
||||
delivered: result.delivered,
|
||||
error: result.error,
|
||||
globalFailureDestination: state.deps.cronConfig?.failureDestination,
|
||||
});
|
||||
job.state.lastDelivered = deliveryState.delivered;
|
||||
job.state.lastDeliveryStatus = deliveryState.status;
|
||||
job.state.lastDeliveryError =
|
||||
deliveryState.status === "not-delivered" && result.error ? result.error : undefined;
|
||||
deliveryState.status === "not-delivered" && deliveryState.error
|
||||
? deliveryState.error
|
||||
: undefined;
|
||||
job.state.lastFailureNotificationDelivered = deliveryState.failureNotification.delivered;
|
||||
job.state.lastFailureNotificationDeliveryStatus = deliveryState.failureNotification.status;
|
||||
job.state.lastFailureNotificationDeliveryError = deliveryState.failureNotification.error;
|
||||
job.updatedAtMs = result.endedAt;
|
||||
|
||||
// Track consecutive errors for backoff / auto-disable; skipped runs use a
|
||||
@@ -1825,9 +1911,10 @@ function emitJobFinished(
|
||||
error: result.error,
|
||||
summary: result.summary,
|
||||
diagnostics: result.diagnostics,
|
||||
delivered: result.delivered,
|
||||
delivered: job.state.lastDelivered,
|
||||
deliveryStatus: job.state.lastDeliveryStatus,
|
||||
deliveryError: job.state.lastDeliveryError,
|
||||
failureNotificationDelivery: failureNotificationDeliveryFromJobState(job),
|
||||
delivery: result.delivery,
|
||||
sessionId: result.sessionId,
|
||||
sessionKey: result.sessionKey,
|
||||
|
||||
@@ -70,6 +70,13 @@ export type CronDeliveryTrace = {
|
||||
delivered?: boolean;
|
||||
};
|
||||
|
||||
export type CronFailureNotificationDelivery = {
|
||||
/** Whether the last failed run's failure notification reached the target channel. */
|
||||
delivered?: boolean;
|
||||
status: CronDeliveryStatus;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type CronDeliveryPreview = {
|
||||
label: string;
|
||||
detail: string;
|
||||
@@ -220,6 +227,12 @@ export type CronJobState = {
|
||||
lastDeliveryError?: string;
|
||||
/** Whether the last run's output was delivered to the target channel. */
|
||||
lastDelivered?: boolean;
|
||||
/** Whether the last failed run's failure notification was delivered to the target channel. */
|
||||
lastFailureNotificationDelivered?: boolean;
|
||||
/** Delivery outcome for the last failed run's failure notification. */
|
||||
lastFailureNotificationDeliveryStatus?: CronDeliveryStatus;
|
||||
/** Delivery-specific error for the last failed run's failure notification. */
|
||||
lastFailureNotificationDeliveryError?: string;
|
||||
};
|
||||
|
||||
export type CronJob = CronJobBase<
|
||||
|
||||
@@ -271,6 +271,15 @@ export const CronDeliveryPatchSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
const CronFailureNotificationDeliverySchema = Type.Object(
|
||||
{
|
||||
delivered: Type.Optional(Type.Boolean()),
|
||||
status: CronDeliveryStatusSchema,
|
||||
error: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const CronJobStateSchema = Type.Object(
|
||||
{
|
||||
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
@@ -288,6 +297,9 @@ export const CronJobStateSchema = Type.Object(
|
||||
lastDelivered: Type.Optional(Type.Boolean()),
|
||||
lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
|
||||
lastDeliveryError: Type.Optional(Type.String()),
|
||||
lastFailureNotificationDelivered: Type.Optional(Type.Boolean()),
|
||||
lastFailureNotificationDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
|
||||
lastFailureNotificationDeliveryError: Type.Optional(Type.String()),
|
||||
lastFailureAlertAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
@@ -308,6 +320,9 @@ const CronJobStatePatchSchema = Type.Object(
|
||||
lastDelivered: Type.Optional(Type.Boolean()),
|
||||
lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
|
||||
lastDeliveryError: Type.Optional(Type.String()),
|
||||
lastFailureNotificationDelivered: Type.Optional(Type.Boolean()),
|
||||
lastFailureNotificationDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
|
||||
lastFailureNotificationDeliveryError: Type.Optional(Type.String()),
|
||||
lastFailureAlertAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
@@ -423,6 +438,7 @@ export const CronRunLogEntrySchema = Type.Object(
|
||||
delivered: Type.Optional(Type.Boolean()),
|
||||
deliveryStatus: Type.Optional(CronDeliveryStatusSchema),
|
||||
deliveryError: Type.Optional(Type.String()),
|
||||
failureNotificationDelivery: Type.Optional(CronFailureNotificationDeliverySchema),
|
||||
sessionId: Type.Optional(NonEmptyString),
|
||||
sessionKey: Type.Optional(NonEmptyString),
|
||||
runId: Type.Optional(NonEmptyString),
|
||||
|
||||
@@ -100,6 +100,12 @@ function toPluginCronJob(job: CronJob): PluginHookGatewayCronJob {
|
||||
lastRunStatus: job.state.lastRunStatus,
|
||||
lastError: job.state.lastError,
|
||||
lastDurationMs: job.state.lastDurationMs,
|
||||
lastDelivered: job.state.lastDelivered,
|
||||
lastDeliveryStatus: job.state.lastDeliveryStatus,
|
||||
lastDeliveryError: job.state.lastDeliveryError,
|
||||
lastFailureNotificationDelivered: job.state.lastFailureNotificationDelivered,
|
||||
lastFailureNotificationDeliveryStatus: job.state.lastFailureNotificationDeliveryStatus,
|
||||
lastFailureNotificationDeliveryError: job.state.lastFailureNotificationDeliveryError,
|
||||
},
|
||||
createdAtMs: job.createdAtMs,
|
||||
updatedAtMs: job.updatedAtMs,
|
||||
@@ -464,6 +470,7 @@ export function buildGatewayCronService(params: {
|
||||
delivered: evt.delivered,
|
||||
deliveryStatus: evt.deliveryStatus,
|
||||
deliveryError: evt.deliveryError,
|
||||
failureNotificationDelivery: evt.failureNotificationDelivery,
|
||||
delivery: evt.delivery,
|
||||
sessionId: evt.sessionId,
|
||||
sessionKey: evt.sessionKey,
|
||||
|
||||
@@ -666,6 +666,12 @@ export type PluginHookGatewayCronJobState = {
|
||||
lastRunStatus?: PluginHookGatewayCronRunStatus;
|
||||
lastError?: string;
|
||||
lastDurationMs?: number;
|
||||
lastDelivered?: boolean;
|
||||
lastDeliveryStatus?: PluginHookGatewayCronDeliveryStatus;
|
||||
lastDeliveryError?: string;
|
||||
lastFailureNotificationDelivered?: boolean;
|
||||
lastFailureNotificationDeliveryStatus?: PluginHookGatewayCronDeliveryStatus;
|
||||
lastFailureNotificationDeliveryError?: string;
|
||||
};
|
||||
|
||||
export type PluginHookGatewayCronJob = {
|
||||
|
||||
@@ -594,6 +594,9 @@ export type CronJobState = {
|
||||
lastDelivered?: boolean;
|
||||
lastDeliveryStatus?: CronDeliveryStatus;
|
||||
lastDeliveryError?: string;
|
||||
lastFailureNotificationDelivered?: boolean;
|
||||
lastFailureNotificationDeliveryStatus?: CronDeliveryStatus;
|
||||
lastFailureNotificationDeliveryError?: string;
|
||||
lastFailureAlertAtMs?: number;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user