fix(cron): resume interrupted recurring jobs on first restart (#60495)

This commit is contained in:
joelnishanth
2026-04-03 18:14:11 -05:00
committed by Ayaan Zaidi
parent 9e389cff3d
commit 7a16e14301
3 changed files with 28 additions and 16 deletions

View File

@@ -115,7 +115,7 @@ describe("CronService restart catch-up", () => {
);
});
it("clears stale running markers without replaying interrupted startup jobs", async () => {
it("replays interrupted recurring job on first restart (#60495)", async () => {
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
@@ -137,21 +137,23 @@ describe("CronService restart catch-up", () => {
},
},
],
async ({ cron, enqueueSystemEvent }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
expect(noopLogger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "restart-stale-running" }),
"cron: clearing stale running marker on startup",
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"resume stale marker",
expect.objectContaining({ agentId: undefined }),
);
expect(requestHeartbeatNow).toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-stale-running");
expect(updated?.state.runningAtMs).toBeUndefined();
expect(updated?.state.lastStatus).toBeUndefined();
expect(updated?.state.lastRunAtMs).toBeUndefined();
expect((updated?.state.nextRunAtMs ?? 0) > Date.parse("2025-12-13T17:00:00.000Z")).toBe(
true,
);
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z"));
},
);
});

View File

@@ -65,7 +65,7 @@ function createMissedIsolatedJob(now: number): CronJob {
}
describe("cron service ops seam coverage", () => {
it("start clears stale running markers, skips startup replay, persists, and arms the timer", async () => {
it("start clears stale running markers, replays interrupted recurring jobs, persists, and arms the timer (#60495)", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-03-23T12:00:00.000Z");
const enqueueSystemEvent = vi.fn();
@@ -93,8 +93,9 @@ describe("cron service ops seam coverage", () => {
expect.objectContaining({ jobId: "startup-interrupted" }),
"cron: clearing stale running marker on startup",
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
// Interrupted recurring jobs are now replayed on first restart (#60495)
expect(enqueueSystemEvent).toHaveBeenCalled();
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(state.timer).not.toBeNull();
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
@@ -103,7 +104,7 @@ describe("cron service ops seam coverage", () => {
const job = persisted.jobs[0];
expect(job).toBeDefined();
expect(job?.state.runningAtMs).toBeUndefined();
expect(job?.state.lastStatus).toBeUndefined();
expect(job?.state.lastStatus).toBe("ok");
expect((job?.state.nextRunAtMs ?? 0) > now).toBe(true);
const delays = timeoutSpy.mock.calls

View File

@@ -101,7 +101,8 @@ export async function start(state: CronServiceState) {
return;
}
const startupInterruptedJobIds = new Set<string>();
const interruptedOneShotIds = new Set<string>();
let clearedAnyRunningMarker = false;
await locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });
const jobs = state.store?.jobs ?? [];
@@ -112,15 +113,23 @@ export async function start(state: CronServiceState) {
"cron: clearing stale running marker on startup",
);
job.state.runningAtMs = undefined;
startupInterruptedJobIds.add(job.id);
clearedAnyRunningMarker = true;
// One-shot jobs are not retried after interruption; recurring jobs
// (cron/every) are eligible for startup catch-up so they don't
// require a second restart to recover (#60495).
if (job.schedule.kind === "at") {
interruptedOneShotIds.add(job.id);
}
}
}
if (startupInterruptedJobIds.size > 0) {
if (clearedAnyRunningMarker) {
await persist(state);
}
});
await runMissedJobs(state, { skipJobIds: startupInterruptedJobIds });
await runMissedJobs(state, {
skipJobIds: interruptedOneShotIds.size > 0 ? interruptedOneShotIds : undefined,
});
await locked(state, async () => {
// Startup catch-up already persisted the latest in-memory store state, and