From ad996e132093fac7644b3c0421236b2fb238bd16 Mon Sep 17 00:00:00 2001 From: Feelw00 Date: Wed, 8 Apr 2026 18:32:49 +0900 Subject: [PATCH] feat(cron): split jobs.json into config and runtime state files Separate cron store into jobs.json (config, git-trackable) and jobs-state.json (runtime state, gitignored) so that cron executions no longer produce meaningless diffs when tracking config in git. Closes #53581 Co-Authored-By: Claude Opus 4.6 (1M context) --- src/cron/service/jobs.ts | 1 + src/cron/service/ops.test.ts | 6 +- src/cron/service/ops.ts | 1 + src/cron/service/timer.test.ts | 6 +- src/cron/store.test.ts | 25 +++- src/cron/store.ts | 215 ++++++++++++++++++++++----------- src/cron/types-shared.ts | 1 + 7 files changed, 176 insertions(+), 79 deletions(-) diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index f9905d92a95..ade58ddb3d6 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -589,6 +589,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo deleteAfterRun, createdAtMs: now, updatedAtMs: now, + configUpdatedAtMs: now, schedule, sessionTarget: input.sessionTarget, wakeMode: input.wakeMode, diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index 85bfda60bf3..87e87824900 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -1,9 +1,9 @@ -import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js"; import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js"; +import { loadCronStore } from "../store.js"; import type { CronJob } from "../types.js"; import { run, start, stop, update } from "./ops.js"; import { createCronServiceState } from "./state.js"; @@ -102,7 +102,7 @@ async function expectDueIsolatedManualRunProgresses(storePath: string, now: numb await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true }); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + const persisted = (await loadCronStore(storePath)) as { jobs: CronJob[]; }; expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined(); @@ -161,7 +161,7 @@ describe("cron service ops seam coverage", () => { expect(requestHeartbeatNow).toHaveBeenCalled(); expect(state.timer).not.toBeNull(); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + const persisted = (await loadCronStore(storePath)) as { jobs: CronJob[]; }; const job = persisted.jobs[0]; diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 067cf35735f..2df74be42ae 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -307,6 +307,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob const enabledChanged = patch.enabled !== undefined; job.updatedAtMs = now; + job.configUpdatedAtMs = now; if (scheduleChanged || enabledChanged) { if (isJobEnabled(job)) { job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); diff --git a/src/cron/service/timer.test.ts b/src/cron/service/timer.test.ts index 26033177247..d38a1dc754c 100644 --- a/src/cron/service/timer.test.ts +++ b/src/cron/service/timer.test.ts @@ -1,8 +1,8 @@ -import fs from "node:fs/promises"; import { afterEach, describe, expect, it, vi } from "vitest"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js"; import { createCronServiceState } from "../../cron/service/state.js"; import { onTimer } from "../../cron/service/timer.js"; +import { loadCronStore } from "../../cron/store.js"; import type { CronJob } from "../../cron/types.js"; import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js"; import { resetTaskRegistryForTests } from "../../tasks/task-registry.js"; @@ -68,9 +68,7 @@ describe("cron service timer seam coverage", () => { heartbeat: { target: "last" }, }); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { - jobs: CronJob[]; - }; + const persisted = await loadCronStore(storePath); const job = persisted.jobs[0]; expect(job).toBeDefined(); expect(job?.state.lastStatus).toBe("ok"); diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts index 90e04365396..68073256f29 100644 --- a/src/cron/store.test.ts +++ b/src/cron/store.test.ts @@ -109,8 +109,13 @@ describe("cron store", () => { const currentRaw = await fs.readFile(store.storePath, "utf-8"); const backupRaw = await fs.readFile(`${store.storePath}.bak`, "utf-8"); - expect(JSON.parse(currentRaw)).toEqual(second); - expect(JSON.parse(backupRaw)).toEqual(first); + const current = JSON.parse(currentRaw); + const backup = JSON.parse(backupRaw); + // jobs.json now contains config-only (state stripped to {}). + expect(current.jobs[0].id).toBe("job-2"); + expect(current.jobs[0].state).toEqual({}); + expect(backup.jobs[0].id).toBe("job-1"); + expect(backup.jobs[0].state).toEqual({}); }); it("skips backup files for runtime-only state churn", async () => { @@ -132,8 +137,20 @@ describe("cron store", () => { await saveCronStore(store.storePath, first); await saveCronStore(store.storePath, second); - const currentRaw = await fs.readFile(store.storePath, "utf-8"); - expect(JSON.parse(currentRaw)).toEqual(second); + // jobs.json should NOT be rewritten (only runtime changed). + const configRaw = await fs.readFile(store.storePath, "utf-8"); + const config = JSON.parse(configRaw); + expect(config.jobs[0].state).toEqual({}); + expect(config.jobs[0]).not.toHaveProperty("updatedAtMs"); + + // State file should contain runtime fields. + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const stateRaw = await fs.readFile(statePath, "utf-8"); + const stateFile = JSON.parse(stateRaw); + expect(stateFile.jobs[first.jobs[0].id].state.nextRunAtMs).toBe( + first.jobs[0].createdAtMs + 60_000, + ); + await expect(fs.stat(`${store.storePath}.bak`)).rejects.toThrow(); }); diff --git a/src/cron/store.ts b/src/cron/store.ts index d6fcaa337dd..80cdc1c8ad6 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -16,51 +16,39 @@ function resolveDefaultCronStorePath(): string { return path.join(resolveDefaultCronDir(), "jobs.json"); } +function resolveStatePath(storePath: string): string { + return storePath.replace(/\.json$/, "-state.json"); +} + +type CronStateFileEntry = { + updatedAtMs?: number; + state?: Record; +}; + +type CronStateFile = { + version: 1; + jobs: Record; +}; + function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown { return { version: store.version, jobs: store.jobs.map((job) => { const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job; - return rest; + return { ...rest, state: {} }; }), }; } -function parseCronStoreForBackupComparison(raw: string): CronStoreFile | null { - try { - const parsed = parseJsonWithJson5Fallback(raw); - if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { - return null; - } - const version = (parsed as { version?: unknown }).version; - const jobs = (parsed as { jobs?: unknown }).jobs; - if (version !== 1 || !Array.isArray(jobs)) { - return null; - } - return { - version: 1, - jobs: jobs.filter(Boolean) as CronStoreFile["jobs"], +function extractStateFile(store: CronStoreFile): CronStateFile { + const jobs: Record = {}; + for (const job of store.jobs) { + jobs[job.id] = { + updatedAtMs: job.updatedAtMs, + state: job.state ?? {}, }; - } catch { - return null; } -} - -function shouldSkipCronBackupForRuntimeOnlyChanges( - previousRaw: string | null, - nextStore: CronStoreFile, -): boolean { - if (previousRaw === null) { - return false; - } - const previous = parseCronStoreForBackupComparison(previousRaw); - if (!previous) { - return false; - } - return ( - JSON.stringify(stripRuntimeOnlyCronFields(previous)) === - JSON.stringify(stripRuntimeOnlyCronFields(nextStore)) - ); + return { version: 1, jobs }; } export function resolveCronStorePath(storePath?: string) { @@ -74,6 +62,37 @@ export function resolveCronStorePath(storePath?: string) { return resolveDefaultCronStorePath(); } +async function loadStateFile(statePath: string): Promise { + try { + const raw = await fs.promises.readFile(statePath, "utf-8"); + const parsed = parseJsonWithJson5Fallback(raw); + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + const record = parsed as Record; + if (record.version !== 1 || typeof record.jobs !== "object" || record.jobs === null) { + return null; + } + return { version: 1, jobs: record.jobs as Record }; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return null; + } + // Best-effort: if state file is corrupt, treat as absent. + return null; + } +} + +function hasInlineState(jobs: Array>): boolean { + return jobs.some( + (job) => + job.state !== undefined && + typeof job.state === "object" && + job.state !== null && + Object.keys(job.state as Record).length > 0, + ); +} + export async function loadCronStore(storePath: string): Promise { try { const raw = await fs.promises.readFile(storePath, "utf-8"); @@ -94,11 +113,51 @@ export async function loadCronStore(storePath: string): Promise { version: 1 as const, jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"], }; - serializedStoreCache.set(storePath, JSON.stringify(store, null, 2)); + + // Load state file and merge. + const statePath = resolveStatePath(storePath); + const stateFile = await loadStateFile(statePath); + + if (stateFile) { + // State file exists: merge state by job ID. Inline state in jobs.json is ignored. + for (const job of store.jobs) { + const entry = stateFile.jobs[job.id]; + if (entry) { + job.updatedAtMs = entry.updatedAtMs ?? job.updatedAtMs; + job.state = (entry.state ?? {}) as never; + } else { + // Job exists in config but not in state file: default to empty state. + if (!job.state || typeof job.state !== "object") { + job.state = {} as never; + } + } + } + } else if (!hasInlineState(jobs as unknown as Array>)) { + // No state file, no inline state: fresh clone or first run. + for (const job of store.jobs) { + job.state = (job.state && typeof job.state === "object" ? job.state : {}) as never; + } + } + // else: migration mode — no state file but jobs.json has inline state. Use as-is. + + // Ensure every job has a state object (defensive). + for (const job of store.jobs) { + if (!job.state || typeof job.state !== "object") { + job.state = {} as never; + } + } + + const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); + serializedStoreCache.set(storePath, configJson); + if (stateFile) { + serializedStoreCache.set(`${storePath}:state`, JSON.stringify(stateFile, null, 2)); + } + return store; } catch (err) { if ((err as { code?: unknown })?.code === "ENOENT") { serializedStoreCache.delete(storePath); + serializedStoreCache.delete(`${storePath}:state`); return { version: 1, jobs: [] }; } throw err; @@ -113,51 +172,71 @@ async function setSecureFileMode(filePath: string): Promise { await fs.promises.chmod(filePath, 0o600).catch(() => undefined); } +async function atomicWrite(filePath: string, content: string, dirMode = 0o700): Promise { + const dir = path.dirname(filePath); + await fs.promises.mkdir(dir, { recursive: true, mode: dirMode }); + await fs.promises.chmod(dir, dirMode).catch(() => undefined); + const tmp = `${filePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`; + await fs.promises.writeFile(tmp, content, { encoding: "utf-8", mode: 0o600 }); + await setSecureFileMode(tmp); + await renameWithRetry(tmp, filePath); + await setSecureFileMode(filePath); +} + export async function saveCronStore( storePath: string, store: CronStoreFile, opts?: SaveCronStoreOptions, ) { - const storeDir = path.dirname(storePath); - await fs.promises.mkdir(storeDir, { recursive: true, mode: 0o700 }); - await fs.promises.chmod(storeDir, 0o700).catch(() => undefined); - const json = JSON.stringify(store, null, 2); - const cached = serializedStoreCache.get(storePath); - if (cached === json) { + const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); + const stateFile = extractStateFile(store); + const stateJson = JSON.stringify(stateFile, null, 2); + + const statePath = resolveStatePath(storePath); + const configCacheKey = storePath; + const stateCacheKey = `${storePath}:state`; + + const cachedConfig = serializedStoreCache.get(configCacheKey); + const cachedState = serializedStoreCache.get(stateCacheKey); + + const configChanged = cachedConfig !== configJson; + const stateChanged = cachedState !== stateJson; + + if (!configChanged && !stateChanged) { return; } - let previous: string | null = cached ?? null; - if (previous === null) { + // Detect migration: state file does not exist on disk yet. + let migrating = false; + if (!cachedState) { try { - previous = await fs.promises.readFile(storePath, "utf-8"); - } catch (err) { - if ((err as { code?: unknown }).code !== "ENOENT") { - throw err; + await fs.promises.access(statePath, fs.constants.F_OK); + } catch { + migrating = true; + } + } + + // Write state file first (safer ordering for migration — see PR_DRAFT.md Atomicity). + if (stateChanged || migrating) { + await atomicWrite(statePath, stateJson); + serializedStoreCache.set(stateCacheKey, stateJson); + } + + if (configChanged || migrating) { + // Determine backup need: only when config actually changed (not migration-only). + const skipBackup = opts?.skipBackup === true || !configChanged; + if (!skipBackup) { + try { + const backupPath = `${storePath}.bak`; + await fs.promises.copyFile(storePath, backupPath); + await setSecureFileMode(backupPath); + } catch { + // best-effort } } + await atomicWrite(storePath, configJson); + serializedStoreCache.set(configCacheKey, configJson); } - if (previous === json) { - serializedStoreCache.set(storePath, json); - return; - } - const skipBackup = - opts?.skipBackup === true || shouldSkipCronBackupForRuntimeOnlyChanges(previous, store); - const tmp = `${storePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`; - await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 }); - await setSecureFileMode(tmp); - if (previous !== null && !skipBackup) { - try { - const backupPath = `${storePath}.bak`; - await fs.promises.copyFile(storePath, backupPath); - await setSecureFileMode(backupPath); - } catch { - // best-effort - } - } - await renameWithRetry(tmp, storePath); - await setSecureFileMode(storePath); - serializedStoreCache.set(storePath, json); } const RENAME_MAX_RETRIES = 3; diff --git a/src/cron/types-shared.ts b/src/cron/types-shared.ts index 68c7f0c97a3..f52cfdd132b 100644 --- a/src/cron/types-shared.ts +++ b/src/cron/types-shared.ts @@ -9,6 +9,7 @@ export type CronJobBase