mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 15:00:41 +00:00
fix(cron): preserve unresolved next-run backoff (#66113)
Merged via squash.
Prepared head SHA: a553daa7eb
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
@@ -25,6 +25,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/sessions: stop heartbeat, cron-event, and exec-event turns from overwriting shared-session routing and origin metadata, preventing synthetic `heartbeat` targets from poisoning later cron or user delivery. (#63733, #35300)
|
||||
- Browser/CDP: let local attach-only `manual-cdp` profiles reuse the local loopback CDP control plane under strict default policy and remote-class probe timeouts, so tabs/snapshot stop falsely reporting a live local browser session as not running. (#65611, #66080) Thanks @mbelinky.
|
||||
- Cron/scheduler: stop inventing short retries when cron next-run calculation returns no valid future slot, and keep a maintenance wake armed so enabled unscheduled jobs recover without entering a refire loop. (#66019, #66083) Thanks @mbelinky.
|
||||
- Cron/scheduler: preserve the active error-backoff floor when maintenance repair recomputes a missing cron next-run, so recurring errored jobs do not resume early after a transient next-run resolution failure. (#66019, #66083, #66113) Thanks @mbelinky.
|
||||
|
||||
## 2026.4.12
|
||||
|
||||
|
||||
@@ -111,4 +111,61 @@ describe("#66019 unresolved next-run repro", () => {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("preserves the active error backoff floor when maintenance repair later finds a natural next run", async () => {
|
||||
const store = issue66019Fixtures.makeStorePath();
|
||||
const scheduledAt = Date.parse("2026-04-13T15:50:00.000Z");
|
||||
let now = scheduledAt;
|
||||
|
||||
const cronJob = createIsolatedRegressionJob({
|
||||
id: "cron-66019-error-backoff-floor",
|
||||
name: "cron-66019-error-backoff-floor",
|
||||
scheduledAt,
|
||||
schedule: { kind: "cron", expr: "0 7 * * *", tz: "Asia/Shanghai" },
|
||||
payload: { kind: "agentTurn", message: "ping" },
|
||||
state: { nextRunAtMs: scheduledAt - 1_000 },
|
||||
});
|
||||
await writeCronJobs(store.storePath, [cronJob]);
|
||||
|
||||
const runIsolatedAgentJob = vi.fn().mockResolvedValue({
|
||||
status: "error",
|
||||
error: "synthetic failure",
|
||||
});
|
||||
const naturalNext = scheduledAt + 5_000;
|
||||
const backoffNext = scheduledAt + 30_000;
|
||||
const nextRunSpy = vi
|
||||
.spyOn(schedule, "computeNextRunAtMs")
|
||||
.mockReturnValueOnce(undefined)
|
||||
.mockReturnValueOnce(naturalNext)
|
||||
.mockReturnValue(naturalNext);
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob,
|
||||
});
|
||||
|
||||
try {
|
||||
await onTimer(state);
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
expect(state.store?.jobs[0]?.state.nextRunAtMs).toBe(backoffNext);
|
||||
|
||||
now = naturalNext + 1;
|
||||
await onTimer(state);
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
|
||||
now = backoffNext + 1;
|
||||
await onTimer(state);
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
nextRunSpy.mockRestore();
|
||||
if (state.timer) {
|
||||
clearTimeout(state.timer);
|
||||
state.timer = null;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -38,6 +38,13 @@ import type { CronServiceState } from "./state.js";
|
||||
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
|
||||
const STAGGER_OFFSET_CACHE_MAX = 4096;
|
||||
const staggerOffsetCache = new Map<string, number>();
|
||||
export const DEFAULT_ERROR_BACKOFF_SCHEDULE_MS = [
|
||||
30_000,
|
||||
60_000,
|
||||
5 * 60_000,
|
||||
15 * 60_000,
|
||||
60 * 60_000,
|
||||
];
|
||||
|
||||
function isFiniteTimestamp(value: unknown): value is number {
|
||||
return typeof value === "number" && Number.isFinite(value);
|
||||
@@ -47,6 +54,14 @@ export function hasScheduledNextRunAtMs(value: unknown): value is number {
|
||||
return isFiniteTimestamp(value) && value > 0;
|
||||
}
|
||||
|
||||
export function errorBackoffMs(
|
||||
consecutiveErrors: number,
|
||||
scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
): number {
|
||||
const idx = Math.min(consecutiveErrors - 1, scheduleMs.length - 1);
|
||||
return scheduleMs[Math.max(0, idx)] ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS[0];
|
||||
}
|
||||
|
||||
function resolveStableCronOffsetMs(jobId: string, staggerMs: number) {
|
||||
if (staggerMs <= 1) {
|
||||
return 0;
|
||||
@@ -421,7 +436,27 @@ function walkSchedulableJobs(
|
||||
function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob; nowMs: number }) {
|
||||
let changed = false;
|
||||
try {
|
||||
const newNext = computeJobNextRunAtMs(params.job, params.nowMs);
|
||||
let newNext = computeJobNextRunAtMs(params.job, params.nowMs);
|
||||
if (
|
||||
params.job.schedule.kind !== "at" &&
|
||||
params.job.state.lastStatus === "error" &&
|
||||
isFiniteTimestamp(params.job.state.lastRunAtMs)
|
||||
) {
|
||||
const consecutiveErrorsRaw = params.job.state.consecutiveErrors;
|
||||
const consecutiveErrors =
|
||||
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
|
||||
? Math.max(1, Math.floor(consecutiveErrorsRaw))
|
||||
: 1;
|
||||
const backoffFloor =
|
||||
params.job.state.lastRunAtMs +
|
||||
errorBackoffMs(
|
||||
consecutiveErrors,
|
||||
params.state.deps.cronConfig?.retry?.backoffMs ?? DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
);
|
||||
if (newNext !== undefined) {
|
||||
newNext = Math.max(newNext, backoffFloor);
|
||||
}
|
||||
}
|
||||
if (params.job.state.nextRunAtMs !== newNext) {
|
||||
params.job.state.nextRunAtMs = newNext;
|
||||
changed = true;
|
||||
|
||||
@@ -20,8 +20,10 @@ import type {
|
||||
CronRunTelemetry,
|
||||
} from "../types.js";
|
||||
import {
|
||||
DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
|
||||
computeJobPreviousRunAtMs,
|
||||
computeJobNextRunAtMs,
|
||||
errorBackoffMs,
|
||||
hasScheduledNextRunAtMs,
|
||||
isJobEnabled,
|
||||
nextWakeAtMs,
|
||||
@@ -199,26 +201,6 @@ function tryFinishCronTaskRun(
|
||||
);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Exponential backoff delays (in ms) indexed by consecutive error count.
|
||||
* After the last entry the delay stays constant.
|
||||
*/
|
||||
const DEFAULT_BACKOFF_SCHEDULE_MS = [
|
||||
30_000, // 1st error → 30 s
|
||||
60_000, // 2nd error → 1 min
|
||||
5 * 60_000, // 3rd error → 5 min
|
||||
15 * 60_000, // 4th error → 15 min
|
||||
60 * 60_000, // 5th+ error → 60 min
|
||||
];
|
||||
|
||||
function errorBackoffMs(
|
||||
consecutiveErrors: number,
|
||||
scheduleMs = DEFAULT_BACKOFF_SCHEDULE_MS,
|
||||
): number {
|
||||
const idx = Math.min(consecutiveErrors - 1, scheduleMs.length - 1);
|
||||
return scheduleMs[Math.max(0, idx)];
|
||||
}
|
||||
|
||||
/** Default max retries for one-shot jobs on transient errors (#24355). */
|
||||
const DEFAULT_MAX_TRANSIENT_RETRIES = 3;
|
||||
|
||||
@@ -269,7 +251,7 @@ function resolveRetryConfig(cronConfig?: CronConfig) {
|
||||
backoffMs:
|
||||
Array.isArray(retry?.backoffMs) && retry.backoffMs.length > 0
|
||||
? retry.backoffMs
|
||||
: DEFAULT_BACKOFF_SCHEDULE_MS.slice(0, 3),
|
||||
: DEFAULT_ERROR_BACKOFF_SCHEDULE_MS.slice(0, 3),
|
||||
retryOn: Array.isArray(retry?.retryOn) && retry.retryOn.length > 0 ? retry.retryOn : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user