mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:20:43 +00:00
fix(cron): alert on persistent skipped runs
This commit is contained in:
@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Cron: preserve exact `NO_REPLY` tool results from isolated jobs with empty final assistant turns as quiet successes instead of surfacing incomplete-turn errors. Fixes #68452; carries forward #68453. Thanks @anyech.
|
||||
- Cron: resolve failure alerts and failure-destination announcements against `session:<id>` targets before falling back to the creator session, so jobs created from group chats can notify the targeted direct session without cross-account routing errors. Refs #62777; carries forward #68535. Thanks @slideshow-dingo and @likewen-tech.
|
||||
- Discord: preserve explicit `user:` and `channel:` delivery targets through plugin routing so cron announcements and failure alerts keep their intended recipient kind. Refs #62777; carries forward #62798. Thanks @neeravmakwana.
|
||||
- Cron: add `failureAlert.includeSkipped` and `openclaw cron edit --failure-alert-include-skipped` so persistently skipped jobs can alert without counting skips as execution errors or affecting retry backoff. Fixes #60846. Thanks @slideshow-dingo.
|
||||
- Cron: classify isolated runs as errors from structured embedded-run execution-denial metadata, with final-output marker fallback for `SYSTEM_RUN_DENIED`, `INVALID_REQUEST`, and approval-binding refusals, so blocked commands no longer appear green in cron history. Fixes #67172; carries forward #67186. Thanks @oc-gh-dr, @hclsys, and @1yihui.
|
||||
- Onboarding/GitHub Copilot: add manifest-owned `--github-copilot-token` support for non-interactive setup, including env fallback, tokenRef storage in ref mode, saved-profile reuse, and current Copilot default-model wiring. Refs #50002 and supersedes #50003. Thanks @scottgl9.
|
||||
- Gateway/install: add a validated `--wrapper`/`OPENCLAW_WRAPPER` service install path that persists executable LaunchAgent/systemd wrappers across forced reinstalls, updates, and doctor repairs instead of falling back to raw node/bun `ProgramArguments`. Fixes #69400. (#72445) Thanks @willtmc.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
c4b54de7557cd14b35a629585ad706a4e7de411cc725bcbce921f22bfaf14ada config-baseline.json
|
||||
3fd4da36f28b508f8e6ac4fceb18262244d8ed70df15244192032ec71027bb4f config-baseline.core.json
|
||||
d2b40fe44761f9e412ce3d4336f341c9c4406f990d09219898cb97cd12c0fdd1 config-baseline.json
|
||||
200c156a074a1eec03bb04b3852b4fd5f1fa4ffa140cc5acdc5e412a33600f14 config-baseline.core.json
|
||||
07963db49502132f26db396c56b36e018b110e6c55a68b3cb012d3ec96f43901 config-baseline.channel.json
|
||||
74b74cb18ac37c0acaa765f398f1f9edbcee4c43567f02d45c89598a1e13afb4 config-baseline.plugin.json
|
||||
|
||||
@@ -161,6 +161,7 @@ Failure notifications follow a separate destination path:
|
||||
- `job.delivery.failureDestination` overrides that per job.
|
||||
- If neither is set and the job already delivers via `announce`, failure notifications now fall back to that primary announce target.
|
||||
- `delivery.failureDestination` is only supported on `sessionTarget="isolated"` jobs unless the primary delivery mode is `webhook`.
|
||||
- `failureAlert.includeSkipped: true` opts a job or global cron alert policy into repeated skipped-run alerts. Skipped runs keep a separate consecutive skip counter, so they do not affect execution-error backoff.
|
||||
|
||||
## CLI examples
|
||||
|
||||
|
||||
@@ -81,6 +81,8 @@ One-shot jobs delete after success by default. Use `--keep-after-run` to preserv
|
||||
|
||||
Recurring jobs use exponential retry backoff after consecutive errors: 30s, 1m, 5m, 15m, 60m. The schedule returns to normal after the next successful run.
|
||||
|
||||
Skipped runs are tracked separately from execution errors. They do not affect retry backoff, but `openclaw cron edit <job-id> --failure-alert-include-skipped` can opt failure alerts into repeated skipped-run notifications.
|
||||
|
||||
### Manual runs
|
||||
|
||||
`openclaw cron run` returns as soon as the manual run is queued. Successful responses include `{ ok: true, enqueued: true, runId }`. Use `openclaw cron runs --id <job-id>` to follow the eventual outcome.
|
||||
|
||||
@@ -1126,6 +1126,7 @@ Applies only to one-shot cron jobs. Recurring jobs use separate failure handling
|
||||
enabled: false,
|
||||
after: 3,
|
||||
cooldownMs: 3600000,
|
||||
includeSkipped: false,
|
||||
mode: "announce",
|
||||
accountId: "main",
|
||||
},
|
||||
@@ -1136,6 +1137,7 @@ Applies only to one-shot cron jobs. Recurring jobs use separate failure handling
|
||||
- `enabled`: enable failure alerts for cron jobs (default: `false`).
|
||||
- `after`: consecutive failures before an alert fires (positive integer, min: `1`).
|
||||
- `cooldownMs`: minimum milliseconds between repeated alerts for the same job (non-negative integer).
|
||||
- `includeSkipped`: count consecutive skipped runs toward the alert threshold (default: `false`). Skipped runs are tracked separately and do not affect execution-error backoff.
|
||||
- `mode`: delivery mode — `"announce"` sends via a channel message; `"webhook"` posts to the configured webhook.
|
||||
- `accountId`: optional account or channel id to scope alert delivery.
|
||||
|
||||
|
||||
@@ -126,9 +126,9 @@ describe("CronToolSchema", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("job.failureAlert exposes after, channel, to, cooldownMs, mode, accountId", () => {
|
||||
it("job.failureAlert exposes after, channel, to, cooldownMs, includeSkipped, mode, accountId", () => {
|
||||
expect(keysAt(schemaRecord, "job.failureAlert")).toEqual(
|
||||
["accountId", "after", "channel", "cooldownMs", "mode", "to"].toSorted(),
|
||||
["accountId", "after", "channel", "cooldownMs", "includeSkipped", "mode", "to"].toSorted(),
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -210,6 +210,9 @@ const CronFailureAlertSchema = Type.Optional(
|
||||
channel: Type.Optional(Type.String({ description: "Alert channel" })),
|
||||
to: Type.Optional(Type.String({ description: "Alert target" })),
|
||||
cooldownMs: Type.Optional(Type.Number({ description: "Cooldown between alerts in ms" })),
|
||||
includeSkipped: Type.Optional(
|
||||
Type.Boolean({ description: "Count consecutive skipped runs toward alerting" }),
|
||||
),
|
||||
mode: optionalStringEnum(["announce", "webhook"] as const),
|
||||
accountId: Type.Optional(Type.String()),
|
||||
},
|
||||
|
||||
@@ -963,4 +963,48 @@ describe("cron cli", () => {
|
||||
expect(patch?.patch?.failureAlert?.mode).toBe("webhook");
|
||||
expect(patch?.patch?.failureAlert?.accountId).toBe("bot-a");
|
||||
});
|
||||
|
||||
it("patches skipped-run inclusion for failure alerts on cron edit", async () => {
|
||||
callGatewayFromCli.mockClear();
|
||||
|
||||
const program = buildProgram();
|
||||
|
||||
await program.parseAsync(["cron", "edit", "job-1", "--failure-alert-include-skipped"], {
|
||||
from: "user",
|
||||
});
|
||||
|
||||
const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update");
|
||||
const patch = updateCall?.[2] as {
|
||||
patch?: {
|
||||
failureAlert?: {
|
||||
includeSkipped?: boolean;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
expect(patch?.patch?.failureAlert?.includeSkipped).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects conflicting skipped-run failure alert flags", async () => {
|
||||
callGatewayFromCli.mockClear();
|
||||
|
||||
const program = buildProgram();
|
||||
|
||||
await expect(
|
||||
program.parseAsync(
|
||||
[
|
||||
"cron",
|
||||
"edit",
|
||||
"job-1",
|
||||
"--failure-alert-include-skipped",
|
||||
"--failure-alert-exclude-skipped",
|
||||
],
|
||||
{ from: "user" },
|
||||
),
|
||||
).rejects.toThrow("__exit__:1");
|
||||
expect(defaultRuntime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Use either --failure-alert-include-skipped"),
|
||||
);
|
||||
expect(callGatewayFromCli).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -86,6 +86,8 @@ export function registerCronEditCommand(cron: Command) {
|
||||
)
|
||||
.option("--failure-alert-to <dest>", "Failure alert destination")
|
||||
.option("--failure-alert-cooldown <duration>", "Minimum time between alerts (e.g. 1h, 30m)")
|
||||
.option("--failure-alert-include-skipped", "Count consecutive skipped runs toward alerts")
|
||||
.option("--failure-alert-exclude-skipped", "Alert only on execution errors")
|
||||
.option("--failure-alert-mode <mode>", "Failure alert delivery mode (announce or webhook)")
|
||||
.option(
|
||||
"--failure-alert-account-id <id>",
|
||||
@@ -260,13 +262,24 @@ export function registerCronEditCommand(cron: Command) {
|
||||
const hasFailureAlertChannel = typeof opts.failureAlertChannel === "string";
|
||||
const hasFailureAlertTo = typeof opts.failureAlertTo === "string";
|
||||
const hasFailureAlertCooldown = typeof opts.failureAlertCooldown === "string";
|
||||
const hasFailureAlertIncludeSkipped =
|
||||
typeof opts.failureAlertIncludeSkipped === "boolean";
|
||||
const hasFailureAlertExcludeSkipped =
|
||||
typeof opts.failureAlertExcludeSkipped === "boolean";
|
||||
const hasFailureAlertMode = typeof opts.failureAlertMode === "string";
|
||||
const hasFailureAlertAccountId = typeof opts.failureAlertAccountId === "string";
|
||||
if (hasFailureAlertIncludeSkipped && hasFailureAlertExcludeSkipped) {
|
||||
throw new Error(
|
||||
"Use either --failure-alert-include-skipped or --failure-alert-exclude-skipped.",
|
||||
);
|
||||
}
|
||||
const hasFailureAlertFields =
|
||||
hasFailureAlertAfter ||
|
||||
hasFailureAlertChannel ||
|
||||
hasFailureAlertTo ||
|
||||
hasFailureAlertCooldown ||
|
||||
hasFailureAlertIncludeSkipped ||
|
||||
hasFailureAlertExcludeSkipped ||
|
||||
hasFailureAlertMode ||
|
||||
hasFailureAlertAccountId;
|
||||
const failureAlertFlag =
|
||||
@@ -299,6 +312,9 @@ export function registerCronEditCommand(cron: Command) {
|
||||
}
|
||||
failureAlert.cooldownMs = cooldownMs;
|
||||
}
|
||||
if (hasFailureAlertIncludeSkipped || hasFailureAlertExcludeSkipped) {
|
||||
failureAlert.includeSkipped = hasFailureAlertIncludeSkipped;
|
||||
}
|
||||
if (hasFailureAlertMode) {
|
||||
const mode = normalizeOptionalLowercaseString(opts.failureAlertMode);
|
||||
if (mode !== "announce" && mode !== "webhook") {
|
||||
|
||||
@@ -20898,6 +20898,9 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
|
||||
minimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
includeSkipped: {
|
||||
type: "boolean",
|
||||
},
|
||||
mode: {
|
||||
type: "string",
|
||||
enum: ["announce", "webhook"],
|
||||
|
||||
@@ -16,6 +16,7 @@ export type CronFailureAlertConfig = {
|
||||
enabled?: boolean;
|
||||
after?: number;
|
||||
cooldownMs?: number;
|
||||
includeSkipped?: boolean;
|
||||
mode?: "announce" | "webhook";
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
@@ -598,6 +598,7 @@ export const OpenClawSchema = z
|
||||
enabled: z.boolean().optional(),
|
||||
after: z.number().int().min(1).optional(),
|
||||
cooldownMs: z.number().int().min(0).optional(),
|
||||
includeSkipped: z.boolean().optional(),
|
||||
mode: z.enum(["announce", "webhook"]).optional(),
|
||||
accountId: z.string().optional(),
|
||||
})
|
||||
|
||||
@@ -204,6 +204,68 @@ describe("CronService failure alerts", () => {
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("preserves includeSkipped through failure alert updates", async () => {
|
||||
const store = await makeStorePath();
|
||||
const sendCronFailureAlert = vi.fn(async () => undefined);
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||
status: "skipped" as const,
|
||||
error: "requests-in-flight",
|
||||
}));
|
||||
|
||||
const cron = createFailureAlertCron({
|
||||
storePath: store.storePath,
|
||||
cronConfig: {
|
||||
failureAlert: {
|
||||
enabled: true,
|
||||
after: 1,
|
||||
},
|
||||
},
|
||||
runIsolatedAgentJob,
|
||||
sendCronFailureAlert,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "updated skipped alert job",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "run report" },
|
||||
failureAlert: {
|
||||
after: 1,
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
},
|
||||
});
|
||||
|
||||
const updated = await cron.update(job.id, {
|
||||
failureAlert: {
|
||||
includeSkipped: true,
|
||||
},
|
||||
});
|
||||
expect(updated?.failureAlert).toEqual(
|
||||
expect.objectContaining({
|
||||
after: 1,
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
includeSkipped: true,
|
||||
}),
|
||||
);
|
||||
|
||||
await cron.run(job.id, "force");
|
||||
expect(sendCronFailureAlert).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "telegram",
|
||||
to: "12345",
|
||||
text: expect.stringContaining('Cron job "updated skipped alert job" skipped 1 times'),
|
||||
}),
|
||||
);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("threads failure alert mode/accountId and skips best-effort jobs", async () => {
|
||||
const store = await makeStorePath();
|
||||
const sendCronFailureAlert = vi.fn(async () => undefined);
|
||||
@@ -267,4 +329,103 @@ describe("CronService failure alerts", () => {
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("alerts for repeated skipped runs only when opted in", async () => {
|
||||
const store = await makeStorePath();
|
||||
const sendCronFailureAlert = vi.fn(async () => undefined);
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||
status: "skipped" as const,
|
||||
error: "disabled",
|
||||
}));
|
||||
|
||||
const cron = createFailureAlertCron({
|
||||
storePath: store.storePath,
|
||||
cronConfig: {
|
||||
failureAlert: {
|
||||
enabled: true,
|
||||
after: 2,
|
||||
cooldownMs: 60_000,
|
||||
includeSkipped: true,
|
||||
},
|
||||
},
|
||||
runIsolatedAgentJob,
|
||||
sendCronFailureAlert,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "gateway restart",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "restart gateway if needed" },
|
||||
delivery: { mode: "announce", channel: "telegram", to: "19098680" },
|
||||
});
|
||||
|
||||
await cron.run(job.id, "force");
|
||||
expect(sendCronFailureAlert).not.toHaveBeenCalled();
|
||||
|
||||
await cron.run(job.id, "force");
|
||||
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
|
||||
expect(sendCronFailureAlert).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "telegram",
|
||||
to: "19098680",
|
||||
text: expect.stringMatching(
|
||||
/Cron job "gateway restart" skipped 2 times\nSkip reason: disabled/,
|
||||
),
|
||||
}),
|
||||
);
|
||||
|
||||
const skippedJob = cron.getJob(job.id);
|
||||
expect(skippedJob?.state.consecutiveSkipped).toBe(2);
|
||||
expect(skippedJob?.state.consecutiveErrors).toBe(0);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("tracks skipped runs without alerting or affecting error backoff when includeSkipped is off", async () => {
|
||||
const store = await makeStorePath();
|
||||
const sendCronFailureAlert = vi.fn(async () => undefined);
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||
status: "skipped" as const,
|
||||
error: "requests-in-flight",
|
||||
}));
|
||||
|
||||
const cron = createFailureAlertCron({
|
||||
storePath: store.storePath,
|
||||
cronConfig: {
|
||||
failureAlert: {
|
||||
enabled: true,
|
||||
after: 1,
|
||||
},
|
||||
},
|
||||
runIsolatedAgentJob,
|
||||
sendCronFailureAlert,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "busy heartbeat",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "run report" },
|
||||
delivery: { mode: "announce", channel: "telegram", to: "19098680" },
|
||||
});
|
||||
|
||||
await cron.run(job.id, "force");
|
||||
await cron.run(job.id, "force");
|
||||
|
||||
expect(sendCronFailureAlert).not.toHaveBeenCalled();
|
||||
const skippedJob = cron.getJob(job.id);
|
||||
expect(skippedJob?.state.consecutiveSkipped).toBe(2);
|
||||
expect(skippedJob?.state.consecutiveErrors).toBe(0);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -860,6 +860,10 @@ function mergeCronFailureAlert(
|
||||
: -1;
|
||||
next.cooldownMs = cooldownMs >= 0 ? Math.floor(cooldownMs) : undefined;
|
||||
}
|
||||
if ("includeSkipped" in patch) {
|
||||
next.includeSkipped =
|
||||
typeof patch.includeSkipped === "boolean" ? patch.includeSkipped : undefined;
|
||||
}
|
||||
if ("mode" in patch) {
|
||||
const mode = normalizeOptionalString(patch.mode) ?? "";
|
||||
next.mode = mode === "announce" || mode === "webhook" ? mode : undefined;
|
||||
|
||||
@@ -56,6 +56,16 @@ const DEFAULT_MAX_MISSED_JOBS_PER_RESTART = 5;
|
||||
const DEFAULT_FAILURE_ALERT_AFTER = 2;
|
||||
const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
|
||||
|
||||
type ResolvedFailureAlert = {
|
||||
after: number;
|
||||
cooldownMs: number;
|
||||
channel: CronMessageChannel;
|
||||
to?: string;
|
||||
mode?: "announce" | "webhook";
|
||||
accountId?: string;
|
||||
includeSkipped: boolean;
|
||||
};
|
||||
|
||||
type TimedCronRunOutcome = CronRunOutcome &
|
||||
CronRunTelemetry & {
|
||||
jobId: string;
|
||||
@@ -299,17 +309,7 @@ function clampNonNegativeInt(value: unknown, fallback: number): number {
|
||||
return floored >= 0 ? floored : fallback;
|
||||
}
|
||||
|
||||
function resolveFailureAlert(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
): {
|
||||
after: number;
|
||||
cooldownMs: number;
|
||||
channel: CronMessageChannel;
|
||||
to?: string;
|
||||
mode?: "announce" | "webhook";
|
||||
accountId?: string;
|
||||
} | null {
|
||||
function resolveFailureAlert(state: CronServiceState, job: CronJob): ResolvedFailureAlert | null {
|
||||
const globalConfig = state.deps.cronConfig?.failureAlert;
|
||||
const jobConfig = job.failureAlert === false ? undefined : job.failureAlert;
|
||||
|
||||
@@ -336,6 +336,7 @@ function resolveFailureAlert(
|
||||
to: mode === "webhook" ? explicitTo : (explicitTo ?? normalizeTo(job.delivery?.to)),
|
||||
mode,
|
||||
accountId: jobConfig?.accountId ?? globalConfig?.accountId,
|
||||
includeSkipped: jobConfig?.includeSkipped ?? globalConfig?.includeSkipped ?? false,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -349,13 +350,16 @@ function emitFailureAlert(
|
||||
to?: string;
|
||||
mode?: "announce" | "webhook";
|
||||
accountId?: string;
|
||||
status: "error" | "skipped";
|
||||
},
|
||||
) {
|
||||
const safeJobName = params.job.name || params.job.id;
|
||||
const truncatedError = (params.error?.trim() || "unknown error").slice(0, 200);
|
||||
const truncatedError = (params.error?.trim() || "unknown reason").slice(0, 200);
|
||||
const statusVerb = params.status === "skipped" ? "skipped" : "failed";
|
||||
const detailLabel = params.status === "skipped" ? "Skip reason" : "Last error";
|
||||
const text = [
|
||||
`Cron job "${safeJobName}" failed ${params.consecutiveErrors} times`,
|
||||
`Last error: ${truncatedError}`,
|
||||
`Cron job "${safeJobName}" ${statusVerb} ${params.consecutiveErrors} times`,
|
||||
`${detailLabel}: ${truncatedError}`,
|
||||
].join("\n");
|
||||
|
||||
if (state.deps.sendCronFailureAlert) {
|
||||
@@ -383,6 +387,43 @@ function emitFailureAlert(
|
||||
}
|
||||
}
|
||||
|
||||
function maybeEmitFailureAlert(
|
||||
state: CronServiceState,
|
||||
params: {
|
||||
job: CronJob;
|
||||
alertConfig: ResolvedFailureAlert | null;
|
||||
status: "error" | "skipped";
|
||||
error?: string;
|
||||
consecutiveCount: number;
|
||||
},
|
||||
) {
|
||||
if (!params.alertConfig || params.consecutiveCount < params.alertConfig.after) {
|
||||
return;
|
||||
}
|
||||
const isBestEffort = params.job.delivery?.bestEffort === true;
|
||||
if (isBestEffort) {
|
||||
return;
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
const lastAlert = params.job.state.lastFailureAlertAtMs;
|
||||
const inCooldown =
|
||||
typeof lastAlert === "number" && now - lastAlert < Math.max(0, params.alertConfig.cooldownMs);
|
||||
if (inCooldown) {
|
||||
return;
|
||||
}
|
||||
emitFailureAlert(state, {
|
||||
job: params.job,
|
||||
error: params.error,
|
||||
consecutiveErrors: params.consecutiveCount,
|
||||
channel: params.alertConfig.channel,
|
||||
to: params.alertConfig.to,
|
||||
mode: params.alertConfig.mode,
|
||||
accountId: params.alertConfig.accountId,
|
||||
status: params.status,
|
||||
});
|
||||
params.job.state.lastFailureAlertAtMs = now;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the result of a job execution to the job's state.
|
||||
* Handles consecutive error tracking, exponential backoff, one-shot disable,
|
||||
@@ -430,33 +471,36 @@ export function applyJobResult(
|
||||
deliveryState.status === "not-delivered" && result.error ? result.error : undefined;
|
||||
job.updatedAtMs = result.endedAt;
|
||||
|
||||
// Track consecutive errors for backoff / auto-disable.
|
||||
// Track consecutive errors for backoff / auto-disable; skipped runs use a
|
||||
// separate counter so opt-in skip alerts do not affect retry behavior.
|
||||
const alertConfig = resolveFailureAlert(state, job);
|
||||
if (result.status === "error") {
|
||||
job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
|
||||
const alertConfig = resolveFailureAlert(state, job);
|
||||
if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) {
|
||||
const isBestEffort = job.delivery?.bestEffort === true;
|
||||
if (!isBestEffort) {
|
||||
const now = state.deps.nowMs();
|
||||
const lastAlert = job.state.lastFailureAlertAtMs;
|
||||
const inCooldown =
|
||||
typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs);
|
||||
if (!inCooldown) {
|
||||
emitFailureAlert(state, {
|
||||
job,
|
||||
error: result.error,
|
||||
consecutiveErrors: job.state.consecutiveErrors,
|
||||
channel: alertConfig.channel,
|
||||
to: alertConfig.to,
|
||||
mode: alertConfig.mode,
|
||||
accountId: alertConfig.accountId,
|
||||
});
|
||||
job.state.lastFailureAlertAtMs = now;
|
||||
}
|
||||
}
|
||||
job.state.consecutiveSkipped = 0;
|
||||
maybeEmitFailureAlert(state, {
|
||||
job,
|
||||
alertConfig,
|
||||
status: "error",
|
||||
error: result.error,
|
||||
consecutiveCount: job.state.consecutiveErrors,
|
||||
});
|
||||
} else if (result.status === "skipped") {
|
||||
job.state.consecutiveErrors = 0;
|
||||
job.state.consecutiveSkipped = (job.state.consecutiveSkipped ?? 0) + 1;
|
||||
if (alertConfig?.includeSkipped) {
|
||||
maybeEmitFailureAlert(state, {
|
||||
job,
|
||||
alertConfig,
|
||||
status: "skipped",
|
||||
error: result.error,
|
||||
consecutiveCount: job.state.consecutiveSkipped,
|
||||
});
|
||||
} else {
|
||||
job.state.lastFailureAlertAtMs = undefined;
|
||||
}
|
||||
} else {
|
||||
job.state.consecutiveErrors = 0;
|
||||
job.state.consecutiveSkipped = 0;
|
||||
job.state.lastFailureAlertAtMs = undefined;
|
||||
}
|
||||
|
||||
|
||||
@@ -103,6 +103,8 @@ export type CronFailureAlert = {
|
||||
channel?: CronMessageChannel;
|
||||
to?: string;
|
||||
cooldownMs?: number;
|
||||
/** When true, consecutive skipped runs count toward the alert threshold. */
|
||||
includeSkipped?: boolean;
|
||||
/** Delivery mode: announce (via messaging channels) or webhook (HTTP POST). */
|
||||
mode?: "announce" | "webhook";
|
||||
/** Account ID for multi-account channel configurations. */
|
||||
@@ -153,6 +155,8 @@ export type CronJobState = {
|
||||
lastDurationMs?: number;
|
||||
/** Number of consecutive execution errors (reset on success). Used for backoff. */
|
||||
consecutiveErrors?: number;
|
||||
/** Number of consecutive skipped executions (reset on success or error). */
|
||||
consecutiveSkipped?: number;
|
||||
/** Last failure alert timestamp (ms since epoch) for cooldown gating. */
|
||||
lastFailureAlertAtMs?: number;
|
||||
/** Number of consecutive schedule computation errors. Auto-disables job after threshold. */
|
||||
|
||||
@@ -161,6 +161,7 @@ export const CronFailureAlertSchema = Type.Object(
|
||||
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
|
||||
to: Type.Optional(Type.String()),
|
||||
cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
includeSkipped: Type.Optional(Type.Boolean()),
|
||||
mode: Type.Optional(Type.Union([Type.Literal("announce"), Type.Literal("webhook")])),
|
||||
accountId: Type.Optional(NonEmptyString),
|
||||
},
|
||||
@@ -239,6 +240,7 @@ export const CronJobStateSchema = Type.Object(
|
||||
lastErrorReason: Type.Optional(CronFailoverReasonSchema),
|
||||
lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
consecutiveErrors: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
consecutiveSkipped: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
lastDelivered: Type.Optional(Type.Boolean()),
|
||||
lastDeliveryStatus: Type.Optional(CronDeliveryStatusSchema),
|
||||
lastDeliveryError: Type.Optional(Type.String()),
|
||||
|
||||
Reference in New Issue
Block a user