From 0df78eda10b60dab08423199e9d9ef34d0276c0a Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Mon, 20 Apr 2026 12:00:18 -0400 Subject: [PATCH] cron: recover missing split state sidecar --- src/cron/store.test.ts | 57 ++++++++++++++++++++++++++++++++++++++++++ src/cron/store.ts | 42 ++++++++++++++++++++----------- 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts index d434e78385e..9563765e720 100644 --- a/src/cron/store.test.ts +++ b/src/cron/store.test.ts @@ -186,6 +186,63 @@ describe("cron store", () => { expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000); }); + it("recreates a missing state sidecar without rewriting unchanged config", async () => { + const store = await makeStorePath(); + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const payload = makeStore("job-1", true); + payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 }; + + await saveCronStore(store.storePath, payload); + await loadCronStore(store.storePath); + const configRawBefore = await fs.readFile(store.storePath, "utf-8"); + await fs.rm(statePath); + + const renamedDestinations: string[] = []; + const origRename = fs.rename.bind(fs); + const spy = vi.spyOn(fs, "rename").mockImplementation(async (src, dest) => { + renamedDestinations.push(String(dest)); + return origRename(src, dest); + }); + + try { + await saveCronStore(store.storePath, payload); + } finally { + spy.mockRestore(); + } + + const configRawAfter = await fs.readFile(store.storePath, "utf-8"); + const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8")); + + expect(configRawAfter).toBe(configRawBefore); + expect(renamedDestinations).toContain(statePath); + expect(renamedDestinations).not.toContain(store.storePath); + expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000); + }); + + it("migrates legacy inline state into the state sidecar", async () => { + const store = await makeStorePath(); + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const legacy = makeStore("job-1", true); + legacy.jobs[0].state = { + lastRunAtMs: legacy.jobs[0].createdAtMs + 30_000, + nextRunAtMs: legacy.jobs[0].createdAtMs + 60_000, + }; + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify(legacy, null, 2), "utf-8"); + + const loaded = await loadCronStore(store.storePath); + await saveCronStore(store.storePath, loaded); + + const config = JSON.parse(await fs.readFile(store.storePath, "utf-8")); + const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8")); + + expect(config.jobs[0]).not.toHaveProperty("updatedAtMs"); + expect(config.jobs[0].state).toEqual({}); + expect(stateFile.jobs["job-1"].updatedAtMs).toBe(legacy.jobs[0].updatedAtMs); + expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000); + }); + it("sanitizes invalid updatedAtMs values from the state sidecar", async () => { const store = await makeStorePath(); const job = makeStore("job-1", true).jobs[0]; diff --git a/src/cron/store.ts b/src/cron/store.ts index af1108ece54..be826580b86 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -7,6 +7,7 @@ import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js"; import type { CronStoreFile } from "./types.js"; const serializedStoreCache = new Map(); +const storesNeedingSplitMigration = new Set(); function resolveDefaultCronDir(): string { return path.join(resolveConfigDir(), "cron"); @@ -146,6 +147,8 @@ export async function loadCronStore(storePath: string): Promise { // Load state file and merge. const statePath = resolveStatePath(storePath); const stateFile = await loadStateFile(statePath); + const hasLegacyInlineState = + !stateFile && hasInlineState(jobs as unknown as Array>); if (stateFile) { // State file exists: merge state by job ID. Inline state in jobs.json is ignored. @@ -158,7 +161,7 @@ export async function loadCronStore(storePath: string): Promise { backfillMissingRuntimeFields(job); } } - } else if (!hasInlineState(jobs as unknown as Array>)) { + } else if (!hasLegacyInlineState) { // No state file, no inline state: fresh clone or first run. for (const job of store.jobs) { backfillMissingRuntimeFields(job); @@ -172,9 +175,13 @@ export async function loadCronStore(storePath: string): Promise { } const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); + const stateJson = JSON.stringify(extractStateFile(store), null, 2); serializedStoreCache.set(storePath, configJson); - if (stateFile) { - serializedStoreCache.set(`${storePath}:state`, JSON.stringify(stateFile, null, 2)); + serializedStoreCache.set(`${storePath}:state`, stateJson); + if (hasLegacyInlineState) { + storesNeedingSplitMigration.add(storePath); + } else { + storesNeedingSplitMigration.delete(storePath); } return store; @@ -182,6 +189,7 @@ export async function loadCronStore(storePath: string): Promise { if ((err as { code?: unknown })?.code === "ENOENT") { serializedStoreCache.delete(storePath); serializedStoreCache.delete(`${storePath}:state`); + storesNeedingSplitMigration.delete(storePath); return { version: 1, jobs: [] }; } throw err; @@ -223,23 +231,28 @@ export async function saveCronStore( const configChanged = cachedConfig !== configJson; const stateChanged = cachedState !== stateJson; + const migrating = storesNeedingSplitMigration.has(storePath); + let stateNeedsWrite = stateChanged; - if (!configChanged && !stateChanged) { - return; - } - - // Detect migration: state file does not exist on disk yet. - let migrating = false; - if (!cachedState) { + if (!stateNeedsWrite) { try { - await fs.promises.access(statePath, fs.constants.F_OK); - } catch { - migrating = true; + const diskStateJson = await fs.promises.readFile(statePath, "utf-8"); + stateNeedsWrite = diskStateJson !== stateJson; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + stateNeedsWrite = true; + } else { + throw err; + } } } + if (!configChanged && !stateNeedsWrite && !migrating) { + return; + } + // Write state first so migration never leaves stripped config without runtime state. - if (stateChanged || migrating) { + if (stateNeedsWrite || migrating) { await atomicWrite(statePath, stateJson); serializedStoreCache.set(stateCacheKey, stateJson); } @@ -259,6 +272,7 @@ export async function saveCronStore( await atomicWrite(storePath, configJson); serializedStoreCache.set(storePath, configJson); } + storesNeedingSplitMigration.delete(storePath); } const RENAME_MAX_RETRIES = 3;