mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix: prevent cron jobs from skipping execution when nextRunAtMs advances (#14068)
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
130
src/cron/service.issue-13992-regression.test.ts
Normal file
130
src/cron/service.issue-13992-regression.test.ts
Normal file
@@ -0,0 +1,130 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { CronServiceState } from "./service/state.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
import { recomputeNextRunsForMaintenance } from "./service/jobs.js";
|
||||
|
||||
describe("issue #13992 regression - cron jobs skip execution", () => {
|
||||
function createMockState(jobs: CronJob[]): CronServiceState {
|
||||
return {
|
||||
store: { version: 1, jobs },
|
||||
running: false,
|
||||
timer: null,
|
||||
storeLoadedAtMs: Date.now(),
|
||||
deps: {
|
||||
storePath: "/mock/path",
|
||||
cronEnabled: true,
|
||||
nowMs: () => Date.now(),
|
||||
log: {
|
||||
debug: () => {},
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
error: () => {},
|
||||
} as never,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
it("should NOT recompute nextRunAtMs for past-due jobs during maintenance", () => {
|
||||
const now = Date.now();
|
||||
const pastDue = now - 60_000; // 1 minute ago
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-job",
|
||||
name: "test job",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
createdAtMs: now - 3600_000,
|
||||
updatedAtMs: now - 3600_000,
|
||||
state: {
|
||||
nextRunAtMs: pastDue, // This is in the past and should NOT be recomputed
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockState([job]);
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
|
||||
// Should not have changed the past-due nextRunAtMs
|
||||
expect(job.state.nextRunAtMs).toBe(pastDue);
|
||||
});
|
||||
|
||||
it("should compute missing nextRunAtMs during maintenance", () => {
|
||||
const now = Date.now();
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-job",
|
||||
name: "test job",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
state: {
|
||||
// nextRunAtMs is missing
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockState([job]);
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
|
||||
// Should have computed a nextRunAtMs
|
||||
expect(typeof job.state.nextRunAtMs).toBe("number");
|
||||
expect(job.state.nextRunAtMs).toBeGreaterThan(now);
|
||||
});
|
||||
|
||||
it("should clear nextRunAtMs for disabled jobs during maintenance", () => {
|
||||
const now = Date.now();
|
||||
const futureTime = now + 3600_000;
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-job",
|
||||
name: "test job",
|
||||
enabled: false, // Disabled
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
state: {
|
||||
nextRunAtMs: futureTime,
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockState([job]);
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
|
||||
// Should have cleared nextRunAtMs for disabled job
|
||||
expect(job.state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("should clear stuck running markers during maintenance", () => {
|
||||
const now = Date.now();
|
||||
const stuckTime = now - 3 * 60 * 60_000; // 3 hours ago (> 2 hour threshold)
|
||||
const futureTime = now + 3600_000;
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-job",
|
||||
name: "test job",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
state: {
|
||||
nextRunAtMs: futureTime,
|
||||
runningAtMs: stuckTime, // Stuck running marker
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockState([job]);
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
|
||||
// Should have cleared stuck running marker
|
||||
expect(job.state.runningAtMs).toBeUndefined();
|
||||
// But should NOT have changed nextRunAtMs (it's still future)
|
||||
expect(job.state.nextRunAtMs).toBe(futureTime);
|
||||
});
|
||||
});
|
||||
@@ -163,6 +163,58 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
|
||||
return changed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintenance-only version of recomputeNextRuns that handles disabled jobs
|
||||
* and stuck markers, but does NOT recompute nextRunAtMs for enabled jobs
|
||||
* with existing values. Used during timer ticks when no due jobs were found
|
||||
* to prevent silently advancing past-due nextRunAtMs values without execution
|
||||
* (see #13992).
|
||||
*/
|
||||
export function recomputeNextRunsForMaintenance(state: CronServiceState): boolean {
|
||||
if (!state.store) {
|
||||
return false;
|
||||
}
|
||||
let changed = false;
|
||||
const now = state.deps.nowMs();
|
||||
for (const job of state.store.jobs) {
|
||||
if (!job.state) {
|
||||
job.state = {};
|
||||
changed = true;
|
||||
}
|
||||
if (!job.enabled) {
|
||||
if (job.state.nextRunAtMs !== undefined) {
|
||||
job.state.nextRunAtMs = undefined;
|
||||
changed = true;
|
||||
}
|
||||
if (job.state.runningAtMs !== undefined) {
|
||||
job.state.runningAtMs = undefined;
|
||||
changed = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const runningAt = job.state.runningAtMs;
|
||||
if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) {
|
||||
state.deps.log.warn(
|
||||
{ jobId: job.id, runningAtMs: runningAt },
|
||||
"cron: clearing stuck running marker",
|
||||
);
|
||||
job.state.runningAtMs = undefined;
|
||||
changed = true;
|
||||
}
|
||||
// Only compute missing nextRunAtMs, do NOT recompute existing ones.
|
||||
// If a job was past-due but not found by findDueJobs, recomputing would
|
||||
// cause it to be silently skipped.
|
||||
if (job.state.nextRunAtMs === undefined) {
|
||||
const newNext = computeJobNextRunAtMs(job, now);
|
||||
if (newNext !== undefined) {
|
||||
job.state.nextRunAtMs = newNext;
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
export function nextWakeAtMs(state: CronServiceState) {
|
||||
const jobs = state.store?.jobs ?? [];
|
||||
const enabled = jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number");
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
computeJobNextRunAtMs,
|
||||
nextWakeAtMs,
|
||||
recomputeNextRuns,
|
||||
recomputeNextRunsForMaintenance,
|
||||
resolveJobPayloadTextForMain,
|
||||
} from "./jobs.js";
|
||||
import { locked } from "./locked.js";
|
||||
@@ -187,7 +188,10 @@ export async function onTimer(state: CronServiceState) {
|
||||
const due = findDueJobs(state);
|
||||
|
||||
if (due.length === 0) {
|
||||
const changed = recomputeNextRuns(state);
|
||||
// Use maintenance-only recompute to avoid advancing past-due nextRunAtMs
|
||||
// values without execution. This prevents jobs from being silently skipped
|
||||
// when the timer wakes up but findDueJobs returns empty (see #13992).
|
||||
const changed = recomputeNextRunsForMaintenance(state);
|
||||
if (changed) {
|
||||
await persist(state);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user