mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 22:34:05 +00:00
fix: honor cron backoff from run end
This commit is contained in:
@@ -822,6 +822,36 @@ describe("recomputeNextRuns", () => {
|
||||
expect(job.state.nextRunAtMs).toBe(now);
|
||||
});
|
||||
|
||||
it("keeps recovered recurring error retries behind run-end backoff", () => {
|
||||
const startedAt = Date.parse("2026-03-01T12:00:00.000Z");
|
||||
const durationMs = 90_000;
|
||||
const now = startedAt + 31_000;
|
||||
const job: CronJob = {
|
||||
id: "failed-every-long-run",
|
||||
name: "failed every long run",
|
||||
enabled: true,
|
||||
createdAtMs: startedAt - 60_000,
|
||||
updatedAtMs: startedAt,
|
||||
schedule: { kind: "every", everyMs: 1_000, anchorMs: startedAt - 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {
|
||||
lastRunAtMs: startedAt,
|
||||
lastDurationMs: durationMs,
|
||||
lastStatus: "error",
|
||||
consecutiveErrors: 1,
|
||||
},
|
||||
};
|
||||
const state = {
|
||||
...createMockState(now),
|
||||
store: { version: 1 as const, jobs: [job] },
|
||||
} as CronServiceState;
|
||||
|
||||
expect(recomputeNextRuns(state)).toBe(true);
|
||||
expect(job.state.nextRunAtMs).toBe(startedAt + durationMs + 30_000);
|
||||
});
|
||||
|
||||
it("repairs future cron nextRunAtMs values that are not schedule slots", () => {
|
||||
const now = Date.parse("2026-05-05T12:00:00.000Z");
|
||||
const badFuture = Date.parse("2026-05-12T16:00:00.000Z");
|
||||
|
||||
@@ -398,6 +398,78 @@ describe("CronService restart catch-up", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps missed cron slots paused until run-end error backoff expires after restart", async () => {
|
||||
vi.setSystemTime(new Date("2025-12-13T04:01:59.000Z"));
|
||||
await withRestartedCron(
|
||||
[
|
||||
{
|
||||
id: "restart-long-run-backoff-pending",
|
||||
name: "long run backoff pending",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2025-12-13T04:01:30.000Z"),
|
||||
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "do not replay long failed run" },
|
||||
state: {
|
||||
nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"),
|
||||
lastRunAtMs: Date.parse("2025-12-13T04:00:00.000Z"),
|
||||
lastDurationMs: 90_000,
|
||||
lastStatus: "error",
|
||||
consecutiveErrors: 1,
|
||||
},
|
||||
},
|
||||
],
|
||||
async ({ cron, enqueueSystemEvent, requestHeartbeat }) => {
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeat).not.toHaveBeenCalled();
|
||||
|
||||
const listedJobs = await cron.list({ includeDisabled: true });
|
||||
const updated = listedJobs.find((job) => job.id === "restart-long-run-backoff-pending");
|
||||
expect(updated?.state.nextRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z"));
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps past-due retries paused until run-end error backoff expires after restart", async () => {
|
||||
vi.setSystemTime(new Date("2025-12-13T04:01:59.000Z"));
|
||||
await withRestartedCron(
|
||||
[
|
||||
{
|
||||
id: "restart-long-run-due-retry",
|
||||
name: "long run due retry",
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2025-12-13T04:00:30.000Z"),
|
||||
schedule: {
|
||||
kind: "every",
|
||||
everyMs: 60_000,
|
||||
anchorMs: Date.parse("2025-12-13T04:00:00.000Z"),
|
||||
},
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "do not run early retry" },
|
||||
state: {
|
||||
nextRunAtMs: Date.parse("2025-12-13T04:00:30.000Z"),
|
||||
lastRunAtMs: Date.parse("2025-12-13T04:00:00.000Z"),
|
||||
lastDurationMs: 90_000,
|
||||
lastStatus: "error",
|
||||
consecutiveErrors: 1,
|
||||
},
|
||||
},
|
||||
],
|
||||
async ({ cron, enqueueSystemEvent, requestHeartbeat }) => {
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeat).not.toHaveBeenCalled();
|
||||
|
||||
const listedJobs = await cron.list({ includeDisabled: true });
|
||||
const updated = listedJobs.find((job) => job.id === "restart-long-run-due-retry");
|
||||
expect(updated?.state.nextRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z"));
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("replays missed cron slot after restart when error backoff has already elapsed", async () => {
|
||||
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
|
||||
await withRestartedCron(
|
||||
|
||||
@@ -62,6 +62,26 @@ export function errorBackoffMs(
|
||||
return scheduleMs[Math.max(0, idx)] ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS[0];
|
||||
}
|
||||
|
||||
export function resolveJobErrorBackoffUntilMs(
|
||||
job: CronJob,
|
||||
scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
): number | undefined {
|
||||
if (job.state.lastStatus !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) {
|
||||
return undefined;
|
||||
}
|
||||
const consecutiveErrorsRaw = job.state.consecutiveErrors;
|
||||
const consecutiveErrors =
|
||||
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
|
||||
? Math.max(1, Math.floor(consecutiveErrorsRaw))
|
||||
: 1;
|
||||
const lastDurationMs =
|
||||
typeof job.state.lastDurationMs === "number" && Number.isFinite(job.state.lastDurationMs)
|
||||
? Math.max(0, Math.floor(job.state.lastDurationMs))
|
||||
: 0;
|
||||
const lastEndedAtMs = job.state.lastRunAtMs + lastDurationMs;
|
||||
return lastEndedAtMs + errorBackoffMs(consecutiveErrors, scheduleMs);
|
||||
}
|
||||
|
||||
function resolveStableCronOffsetMs(jobId: string, staggerMs: number) {
|
||||
if (staggerMs <= 1) {
|
||||
return 0;
|
||||
@@ -154,26 +174,11 @@ function isPendingErrorBackoffSlot(params: {
|
||||
nowMs: number;
|
||||
}): boolean {
|
||||
const { state, job, nextRunAtMs, nowMs } = params;
|
||||
if (job.state.lastStatus !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) {
|
||||
return false;
|
||||
}
|
||||
const consecutiveErrorsRaw = job.state.consecutiveErrors;
|
||||
const consecutiveErrors =
|
||||
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
|
||||
? Math.max(1, Math.floor(consecutiveErrorsRaw))
|
||||
: 1;
|
||||
const lastDurationMs =
|
||||
typeof job.state.lastDurationMs === "number" && Number.isFinite(job.state.lastDurationMs)
|
||||
? Math.max(0, Math.floor(job.state.lastDurationMs))
|
||||
: 0;
|
||||
const lastEndedAtMs = job.state.lastRunAtMs + lastDurationMs;
|
||||
const backoffFloor =
|
||||
lastEndedAtMs +
|
||||
errorBackoffMs(
|
||||
consecutiveErrors,
|
||||
state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
return nowMs < backoffFloor && nextRunAtMs <= backoffFloor;
|
||||
const backoffUntilMs = resolveJobErrorBackoffUntilMs(
|
||||
job,
|
||||
state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
return backoffUntilMs !== undefined && nowMs < backoffUntilMs && nextRunAtMs <= backoffUntilMs;
|
||||
}
|
||||
|
||||
function shouldRepairFutureCronNextRunAtMs(params: {
|
||||
@@ -552,19 +557,12 @@ function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob
|
||||
params.job.state.lastStatus === "error" &&
|
||||
isFiniteTimestamp(params.job.state.lastRunAtMs)
|
||||
) {
|
||||
const consecutiveErrorsRaw = params.job.state.consecutiveErrors;
|
||||
const consecutiveErrors =
|
||||
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
|
||||
? Math.max(1, Math.floor(consecutiveErrorsRaw))
|
||||
: 1;
|
||||
const backoffFloor =
|
||||
params.job.state.lastRunAtMs +
|
||||
errorBackoffMs(
|
||||
consecutiveErrors,
|
||||
params.state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
const backoffFloor = resolveJobErrorBackoffUntilMs(
|
||||
params.job,
|
||||
params.state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
if (newNext !== undefined) {
|
||||
newNext = Math.max(newNext, backoffFloor);
|
||||
newNext = backoffFloor !== undefined ? Math.max(newNext, backoffFloor) : newNext;
|
||||
}
|
||||
}
|
||||
if (params.job.state.nextRunAtMs !== newNext) {
|
||||
@@ -634,11 +632,20 @@ export function recomputeNextRunsForMaintenance(
|
||||
now >= job.state.nextRunAtMs &&
|
||||
typeof job.state.runningAtMs !== "number"
|
||||
) {
|
||||
// Only advance when the expired slot was already executed.
|
||||
// If not, preserve the past-due value so the job can still run.
|
||||
// Only advance when the expired slot was already executed, or when
|
||||
// old start-based retry state predates the active run-end backoff.
|
||||
// Otherwise preserve the past-due value so the job can still run.
|
||||
const lastRun = job.state.lastRunAtMs;
|
||||
const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs;
|
||||
if (alreadyExecutedSlot) {
|
||||
const backoffUntilMs = resolveJobErrorBackoffUntilMs(
|
||||
job,
|
||||
state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
const isStaleBackoffSlot =
|
||||
backoffUntilMs !== undefined &&
|
||||
now < backoffUntilMs &&
|
||||
job.state.nextRunAtMs < backoffUntilMs;
|
||||
if (alreadyExecutedSlot || isStaleBackoffSlot) {
|
||||
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
||||
changed = true;
|
||||
}
|
||||
|
||||
@@ -57,6 +57,7 @@ import {
|
||||
nextWakeAtMs,
|
||||
recomputeNextRunsForMaintenance,
|
||||
recordScheduleComputeError,
|
||||
resolveJobErrorBackoffUntilMs,
|
||||
resolveJobPayloadTextForMain,
|
||||
} from "./jobs.js";
|
||||
import { locked } from "./locked.js";
|
||||
@@ -1467,6 +1468,7 @@ export async function onTimer(state: CronServiceState) {
|
||||
}
|
||||
|
||||
function isRunnableJob(params: {
|
||||
state: CronServiceState;
|
||||
job: CronJob;
|
||||
nowMs: number;
|
||||
skipJobIds?: ReadonlySet<string>;
|
||||
@@ -1504,14 +1506,14 @@ function isRunnableJob(params: {
|
||||
return false;
|
||||
}
|
||||
const next = job.state.nextRunAtMs;
|
||||
if (isErrorBackoffPending(params.state, job, nowMs)) {
|
||||
// Error retry windows are anchored at run end; persisted start-based
|
||||
// retry timestamps from older state must not bypass active backoff.
|
||||
return false;
|
||||
}
|
||||
if (hasScheduledNextRunAtMs(next) && nowMs >= next) {
|
||||
return true;
|
||||
}
|
||||
if (hasScheduledNextRunAtMs(next) && next > nowMs && isErrorBackoffPending(job, nowMs)) {
|
||||
// Respect active retry backoff windows on restart, but allow missed-slot
|
||||
// replay once the backoff window has elapsed.
|
||||
return false;
|
||||
}
|
||||
if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") {
|
||||
return false;
|
||||
}
|
||||
@@ -1532,20 +1534,15 @@ function isRunnableJob(params: {
|
||||
return previousRunAtMs > lastRunAtMs;
|
||||
}
|
||||
|
||||
function isErrorBackoffPending(job: CronJob, nowMs: number): boolean {
|
||||
function isErrorBackoffPending(state: CronServiceState, job: CronJob, nowMs: number): boolean {
|
||||
if (job.schedule.kind === "at" || job.state.lastStatus !== "error") {
|
||||
return false;
|
||||
}
|
||||
const lastRunAtMs = job.state.lastRunAtMs;
|
||||
if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) {
|
||||
return false;
|
||||
}
|
||||
const consecutiveErrorsRaw = job.state.consecutiveErrors;
|
||||
const consecutiveErrors =
|
||||
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
|
||||
? Math.max(1, Math.floor(consecutiveErrorsRaw))
|
||||
: 1;
|
||||
return nowMs < lastRunAtMs + errorBackoffMs(consecutiveErrors);
|
||||
const backoffUntilMs = resolveJobErrorBackoffUntilMs(
|
||||
job,
|
||||
state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
return backoffUntilMs !== undefined && nowMs < backoffUntilMs;
|
||||
}
|
||||
|
||||
function collectRunnableJobs(
|
||||
@@ -1562,6 +1559,7 @@ function collectRunnableJobs(
|
||||
}
|
||||
return state.store.jobs.filter((job) =>
|
||||
isRunnableJob({
|
||||
state,
|
||||
job,
|
||||
nowMs,
|
||||
skipJobIds: opts?.skipJobIds,
|
||||
@@ -1571,6 +1569,55 @@ function collectRunnableJobs(
|
||||
);
|
||||
}
|
||||
|
||||
function deferPendingBackoffMissedCronSlots(
|
||||
state: CronServiceState,
|
||||
nowMs: number,
|
||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||
): boolean {
|
||||
if (!state.store) {
|
||||
return false;
|
||||
}
|
||||
let changed = false;
|
||||
for (const job of state.store.jobs) {
|
||||
if (
|
||||
!isJobEnabled(job) ||
|
||||
job.schedule.kind !== "cron" ||
|
||||
opts?.skipJobIds?.has(job.id) ||
|
||||
typeof job.state.runningAtMs === "number"
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const backoffUntilMs = resolveJobErrorBackoffUntilMs(
|
||||
job,
|
||||
state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
if (backoffUntilMs === undefined || nowMs >= backoffUntilMs) {
|
||||
continue;
|
||||
}
|
||||
let previousRunAtMs: number | undefined;
|
||||
try {
|
||||
previousRunAtMs = computeJobPreviousRunAtMs(job, nowMs);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const lastRunAtMs = job.state.lastRunAtMs;
|
||||
if (
|
||||
typeof previousRunAtMs !== "number" ||
|
||||
!Number.isFinite(previousRunAtMs) ||
|
||||
typeof lastRunAtMs !== "number" ||
|
||||
!Number.isFinite(lastRunAtMs) ||
|
||||
previousRunAtMs <= lastRunAtMs
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if (job.state.nextRunAtMs !== backoffUntilMs) {
|
||||
job.state.nextRunAtMs = backoffUntilMs;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
export async function runMissedJobs(
|
||||
state: CronServiceState,
|
||||
opts?: { skipJobIds?: ReadonlySet<string>; deferAgentTurnJobs?: boolean },
|
||||
@@ -1599,12 +1646,18 @@ async function planStartupCatchup(
|
||||
}
|
||||
|
||||
const now = state.deps.nowMs();
|
||||
const deferredBackoffMissedSlot = deferPendingBackoffMissedCronSlots(state, now, {
|
||||
skipJobIds: opts?.skipJobIds,
|
||||
});
|
||||
const missed = collectRunnableJobs(state, now, {
|
||||
skipJobIds: opts?.skipJobIds,
|
||||
skipAtIfAlreadyRan: true,
|
||||
allowCronMissedRunByLastRun: true,
|
||||
});
|
||||
if (missed.length === 0) {
|
||||
if (deferredBackoffMissedSlot) {
|
||||
await persist(state);
|
||||
}
|
||||
return { candidates: [], deferredJobs: [] };
|
||||
}
|
||||
const sorted = missed.toSorted(
|
||||
|
||||
Reference in New Issue
Block a user