From 7d76e54f2bbda91d848d92dc72324dd9f6104f04 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 29 May 2026 05:36:43 -0400 Subject: [PATCH] fix: honor cron backoff from run end --- src/cron/service.jobs.test.ts | 30 +++++++++ src/cron/service.restart-catchup.test.ts | 72 ++++++++++++++++++++ src/cron/service/jobs.ts | 77 +++++++++++---------- src/cron/service/timer.ts | 85 +++++++++++++++++++----- 4 files changed, 213 insertions(+), 51 deletions(-) diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index 4f66764a662..c27bb0adfa1 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -822,6 +822,36 @@ describe("recomputeNextRuns", () => { expect(job.state.nextRunAtMs).toBe(now); }); + it("keeps recovered recurring error retries behind run-end backoff", () => { + const startedAt = Date.parse("2026-03-01T12:00:00.000Z"); + const durationMs = 90_000; + const now = startedAt + 31_000; + const job: CronJob = { + id: "failed-every-long-run", + name: "failed every long run", + enabled: true, + createdAtMs: startedAt - 60_000, + updatedAtMs: startedAt, + schedule: { kind: "every", everyMs: 1_000, anchorMs: startedAt - 60_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: { + lastRunAtMs: startedAt, + lastDurationMs: durationMs, + lastStatus: "error", + consecutiveErrors: 1, + }, + }; + const state = { + ...createMockState(now), + store: { version: 1 as const, jobs: [job] }, + } as CronServiceState; + + expect(recomputeNextRuns(state)).toBe(true); + expect(job.state.nextRunAtMs).toBe(startedAt + durationMs + 30_000); + }); + it("repairs future cron nextRunAtMs values that are not schedule slots", () => { const now = Date.parse("2026-05-05T12:00:00.000Z"); const badFuture = Date.parse("2026-05-12T16:00:00.000Z"); diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts index 28dba2ce467..39058b08da0 100644 --- a/src/cron/service.restart-catchup.test.ts +++ b/src/cron/service.restart-catchup.test.ts @@ -398,6 +398,78 @@ describe("CronService restart catch-up", () => { ); }); + it("keeps missed cron slots paused until run-end error backoff expires after restart", async () => { + vi.setSystemTime(new Date("2025-12-13T04:01:59.000Z")); + await withRestartedCron( + [ + { + id: "restart-long-run-backoff-pending", + name: "long run backoff pending", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T04:01:30.000Z"), + schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "do not replay long failed run" }, + state: { + nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"), + lastRunAtMs: Date.parse("2025-12-13T04:00:00.000Z"), + lastDurationMs: 90_000, + lastStatus: "error", + consecutiveErrors: 1, + }, + }, + ], + async ({ cron, enqueueSystemEvent, requestHeartbeat }) => { + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeat).not.toHaveBeenCalled(); + + const listedJobs = await cron.list({ includeDisabled: true }); + const updated = listedJobs.find((job) => job.id === "restart-long-run-backoff-pending"); + expect(updated?.state.nextRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z")); + }, + ); + }); + + it("keeps past-due retries paused until run-end error backoff expires after restart", async () => { + vi.setSystemTime(new Date("2025-12-13T04:01:59.000Z")); + await withRestartedCron( + [ + { + id: "restart-long-run-due-retry", + name: "long run due retry", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T04:00:30.000Z"), + schedule: { + kind: "every", + everyMs: 60_000, + anchorMs: Date.parse("2025-12-13T04:00:00.000Z"), + }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "do not run early retry" }, + state: { + nextRunAtMs: Date.parse("2025-12-13T04:00:30.000Z"), + lastRunAtMs: Date.parse("2025-12-13T04:00:00.000Z"), + lastDurationMs: 90_000, + lastStatus: "error", + consecutiveErrors: 1, + }, + }, + ], + async ({ cron, enqueueSystemEvent, requestHeartbeat }) => { + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeat).not.toHaveBeenCalled(); + + const listedJobs = await cron.list({ includeDisabled: true }); + const updated = listedJobs.find((job) => job.id === "restart-long-run-due-retry"); + expect(updated?.state.nextRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z")); + }, + ); + }); + it("replays missed cron slot after restart when error backoff has already elapsed", async () => { vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); await withRestartedCron( diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index be1c34317c1..beb74606b3f 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -62,6 +62,26 @@ export function errorBackoffMs( return scheduleMs[Math.max(0, idx)] ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS[0]; } +export function resolveJobErrorBackoffUntilMs( + job: CronJob, + scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, +): number | undefined { + if (job.state.lastStatus !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) { + return undefined; + } + const consecutiveErrorsRaw = job.state.consecutiveErrors; + const consecutiveErrors = + typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw) + ? Math.max(1, Math.floor(consecutiveErrorsRaw)) + : 1; + const lastDurationMs = + typeof job.state.lastDurationMs === "number" && Number.isFinite(job.state.lastDurationMs) + ? Math.max(0, Math.floor(job.state.lastDurationMs)) + : 0; + const lastEndedAtMs = job.state.lastRunAtMs + lastDurationMs; + return lastEndedAtMs + errorBackoffMs(consecutiveErrors, scheduleMs); +} + function resolveStableCronOffsetMs(jobId: string, staggerMs: number) { if (staggerMs <= 1) { return 0; @@ -154,26 +174,11 @@ function isPendingErrorBackoffSlot(params: { nowMs: number; }): boolean { const { state, job, nextRunAtMs, nowMs } = params; - if (job.state.lastStatus !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) { - return false; - } - const consecutiveErrorsRaw = job.state.consecutiveErrors; - const consecutiveErrors = - typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw) - ? Math.max(1, Math.floor(consecutiveErrorsRaw)) - : 1; - const lastDurationMs = - typeof job.state.lastDurationMs === "number" && Number.isFinite(job.state.lastDurationMs) - ? Math.max(0, Math.floor(job.state.lastDurationMs)) - : 0; - const lastEndedAtMs = job.state.lastRunAtMs + lastDurationMs; - const backoffFloor = - lastEndedAtMs + - errorBackoffMs( - consecutiveErrors, - state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, - ); - return nowMs < backoffFloor && nextRunAtMs <= backoffFloor; + const backoffUntilMs = resolveJobErrorBackoffUntilMs( + job, + state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, + ); + return backoffUntilMs !== undefined && nowMs < backoffUntilMs && nextRunAtMs <= backoffUntilMs; } function shouldRepairFutureCronNextRunAtMs(params: { @@ -552,19 +557,12 @@ function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob params.job.state.lastStatus === "error" && isFiniteTimestamp(params.job.state.lastRunAtMs) ) { - const consecutiveErrorsRaw = params.job.state.consecutiveErrors; - const consecutiveErrors = - typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw) - ? Math.max(1, Math.floor(consecutiveErrorsRaw)) - : 1; - const backoffFloor = - params.job.state.lastRunAtMs + - errorBackoffMs( - consecutiveErrors, - params.state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, - ); + const backoffFloor = resolveJobErrorBackoffUntilMs( + params.job, + params.state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, + ); if (newNext !== undefined) { - newNext = Math.max(newNext, backoffFloor); + newNext = backoffFloor !== undefined ? Math.max(newNext, backoffFloor) : newNext; } } if (params.job.state.nextRunAtMs !== newNext) { @@ -634,11 +632,20 @@ export function recomputeNextRunsForMaintenance( now >= job.state.nextRunAtMs && typeof job.state.runningAtMs !== "number" ) { - // Only advance when the expired slot was already executed. - // If not, preserve the past-due value so the job can still run. + // Only advance when the expired slot was already executed, or when + // old start-based retry state predates the active run-end backoff. + // Otherwise preserve the past-due value so the job can still run. const lastRun = job.state.lastRunAtMs; const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs; - if (alreadyExecutedSlot) { + const backoffUntilMs = resolveJobErrorBackoffUntilMs( + job, + state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, + ); + const isStaleBackoffSlot = + backoffUntilMs !== undefined && + now < backoffUntilMs && + job.state.nextRunAtMs < backoffUntilMs; + if (alreadyExecutedSlot || isStaleBackoffSlot) { if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { changed = true; } diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index e95eb9c39d9..d43b2c2bbb6 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -57,6 +57,7 @@ import { nextWakeAtMs, recomputeNextRunsForMaintenance, recordScheduleComputeError, + resolveJobErrorBackoffUntilMs, resolveJobPayloadTextForMain, } from "./jobs.js"; import { locked } from "./locked.js"; @@ -1467,6 +1468,7 @@ export async function onTimer(state: CronServiceState) { } function isRunnableJob(params: { + state: CronServiceState; job: CronJob; nowMs: number; skipJobIds?: ReadonlySet; @@ -1504,14 +1506,14 @@ function isRunnableJob(params: { return false; } const next = job.state.nextRunAtMs; + if (isErrorBackoffPending(params.state, job, nowMs)) { + // Error retry windows are anchored at run end; persisted start-based + // retry timestamps from older state must not bypass active backoff. + return false; + } if (hasScheduledNextRunAtMs(next) && nowMs >= next) { return true; } - if (hasScheduledNextRunAtMs(next) && next > nowMs && isErrorBackoffPending(job, nowMs)) { - // Respect active retry backoff windows on restart, but allow missed-slot - // replay once the backoff window has elapsed. - return false; - } if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") { return false; } @@ -1532,20 +1534,15 @@ function isRunnableJob(params: { return previousRunAtMs > lastRunAtMs; } -function isErrorBackoffPending(job: CronJob, nowMs: number): boolean { +function isErrorBackoffPending(state: CronServiceState, job: CronJob, nowMs: number): boolean { if (job.schedule.kind === "at" || job.state.lastStatus !== "error") { return false; } - const lastRunAtMs = job.state.lastRunAtMs; - if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) { - return false; - } - const consecutiveErrorsRaw = job.state.consecutiveErrors; - const consecutiveErrors = - typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw) - ? Math.max(1, Math.floor(consecutiveErrorsRaw)) - : 1; - return nowMs < lastRunAtMs + errorBackoffMs(consecutiveErrors); + const backoffUntilMs = resolveJobErrorBackoffUntilMs( + job, + state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, + ); + return backoffUntilMs !== undefined && nowMs < backoffUntilMs; } function collectRunnableJobs( @@ -1562,6 +1559,7 @@ function collectRunnableJobs( } return state.store.jobs.filter((job) => isRunnableJob({ + state, job, nowMs, skipJobIds: opts?.skipJobIds, @@ -1571,6 +1569,55 @@ function collectRunnableJobs( ); } +function deferPendingBackoffMissedCronSlots( + state: CronServiceState, + nowMs: number, + opts?: { skipJobIds?: ReadonlySet }, +): boolean { + if (!state.store) { + return false; + } + let changed = false; + for (const job of state.store.jobs) { + if ( + !isJobEnabled(job) || + job.schedule.kind !== "cron" || + opts?.skipJobIds?.has(job.id) || + typeof job.state.runningAtMs === "number" + ) { + continue; + } + const backoffUntilMs = resolveJobErrorBackoffUntilMs( + job, + state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, + ); + if (backoffUntilMs === undefined || nowMs >= backoffUntilMs) { + continue; + } + let previousRunAtMs: number | undefined; + try { + previousRunAtMs = computeJobPreviousRunAtMs(job, nowMs); + } catch { + continue; + } + const lastRunAtMs = job.state.lastRunAtMs; + if ( + typeof previousRunAtMs !== "number" || + !Number.isFinite(previousRunAtMs) || + typeof lastRunAtMs !== "number" || + !Number.isFinite(lastRunAtMs) || + previousRunAtMs <= lastRunAtMs + ) { + continue; + } + if (job.state.nextRunAtMs !== backoffUntilMs) { + job.state.nextRunAtMs = backoffUntilMs; + changed = true; + } + } + return changed; +} + export async function runMissedJobs( state: CronServiceState, opts?: { skipJobIds?: ReadonlySet; deferAgentTurnJobs?: boolean }, @@ -1599,12 +1646,18 @@ async function planStartupCatchup( } const now = state.deps.nowMs(); + const deferredBackoffMissedSlot = deferPendingBackoffMissedCronSlots(state, now, { + skipJobIds: opts?.skipJobIds, + }); const missed = collectRunnableJobs(state, now, { skipJobIds: opts?.skipJobIds, skipAtIfAlreadyRan: true, allowCronMissedRunByLastRun: true, }); if (missed.length === 0) { + if (deferredBackoffMissedSlot) { + await persist(state); + } return { candidates: [], deferredJobs: [] }; } const sorted = missed.toSorted(