diff --git a/CHANGELOG.md b/CHANGELOG.md index 403aa4db05f..d26a9f23559 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Docs: https://docs.openclaw.ai ### Changes - Plugins/tests: reuse plugin loader alias and Jiti config resolution across repeated same-context loads, reducing import-heavy test overhead. (#69316) Thanks @amknight. +- Cron: split runtime execution state into `jobs-state.json` so `jobs.json` stays stable for git-tracked job definitions. (#63105) Thanks @Feelw00. ### Fixes diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 45c59006a38..bf8caa127c0 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -33,7 +33,9 @@ openclaw cron runs --id ## How cron works - Cron runs **inside the Gateway** process (not inside the model). -- Jobs persist at `~/.openclaw/cron/jobs.json` so restarts do not lose schedules. +- Job definitions persist at `~/.openclaw/cron/jobs.json` so restarts do not lose schedules. +- Runtime execution state persists next to it in `~/.openclaw/cron/jobs-state.json`. If you track cron definitions in git, track `jobs.json` and gitignore `jobs-state.json`. +- After the split, older OpenClaw versions can read `jobs.json` but may treat jobs as fresh because runtime fields now live in `jobs-state.json`. - All cron executions create [background task](/automation/tasks) records. - One-shot jobs (`--at`) auto-delete after success by default. - Isolated cron runs best-effort close tracked browser tabs/processes for their `cron:` session when the run completes, so detached browser automation does not leave orphaned processes behind. @@ -368,6 +370,10 @@ Model override note: } ``` +The runtime state sidecar is derived from `cron.store`: a `.json` store such as +`~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path +without a `.json` suffix appends `-state.json`. + Disable cron: `cron.enabled: false` or `OPENCLAW_SKIP_CRON=1`. **One-shot retry**: transient errors (rate limit, overload, network, server error) retry up to 3 times with exponential backoff. Permanent errors disable immediately. diff --git a/docs/automation/tasks.md b/docs/automation/tasks.md index 8b01365c59a..186e2499488 100644 --- a/docs/automation/tasks.md +++ b/docs/automation/tasks.md @@ -301,7 +301,7 @@ See [Task Flow](/automation/taskflow) for details. ### Tasks and cron -A cron job **definition** lives in `~/.openclaw/cron/jobs.json`. **Every** cron execution creates a task record — both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications. +A cron job **definition** lives in `~/.openclaw/cron/jobs.json`; runtime execution state lives beside it in `~/.openclaw/cron/jobs-state.json`. **Every** cron execution creates a task record — both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications. See [Cron Jobs](/automation/cron-jobs). diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 7c400fc4e9c..83aad3c0ae0 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -8,6 +8,7 @@ import { writeCronStoreSnapshot, } from "./service.issue-regressions.test-helpers.js"; import { CronService } from "./service.js"; +import { loadCronStore } from "./store.js"; import type { CronJob, CronJobState } from "./types.js"; describe("Cron issue regressions", () => { @@ -92,9 +93,7 @@ describe("Cron issue regressions", () => { expect(Number.isFinite(isolated?.state.nextRunAtMs)).toBe(true); expect(Number.isFinite(status.nextWakeAtMs)).toBe(true); - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs: Array<{ id: string; state?: { nextRunAtMs?: number | null } }>; - }; + const persisted = await loadCronStore(store.storePath); const persistedIsolated = persisted.jobs.find((job) => job.id === "legacy-isolated"); expect(typeof persistedIsolated?.state?.nextRunAtMs).toBe("number"); expect(Number.isFinite(persistedIsolated?.state?.nextRunAtMs)).toBe(true); @@ -187,9 +186,7 @@ describe("Cron issue regressions", () => { payload: { kind: "systemEvent", text: "other-updated" }, }); - const storeData = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs: Array<{ id: string; state?: { nextRunAtMs?: number } }>; - }; + const storeData = await loadCronStore(store.storePath); const persistedDueJob = storeData.jobs.find((job) => job.id === dueJob.id); expect(persistedDueJob?.state?.nextRunAtMs).toBe(originalDueNextRunAtMs); @@ -300,9 +297,7 @@ describe("Cron issue regressions", () => { const result = await cron.run(job.id, "force"); expect(result).toEqual({ ok: true, ran: true }); - const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as { - jobs: CronJob[]; - }; + const persisted = await loadCronStore(store.storePath); const persistedJob = persisted.jobs.find((entry) => entry.id === job.id); expect(persistedJob?.delivery?.to).toBe(rewrittenTarget); expect(persistedJob?.state.lastStatus).toBe("ok"); diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index 85bfda60bf3..87e87824900 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -1,9 +1,9 @@ -import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js"; import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js"; +import { loadCronStore } from "../store.js"; import type { CronJob } from "../types.js"; import { run, start, stop, update } from "./ops.js"; import { createCronServiceState } from "./state.js"; @@ -102,7 +102,7 @@ async function expectDueIsolatedManualRunProgresses(storePath: string, now: numb await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true }); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + const persisted = (await loadCronStore(storePath)) as { jobs: CronJob[]; }; expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined(); @@ -161,7 +161,7 @@ describe("cron service ops seam coverage", () => { expect(requestHeartbeatNow).toHaveBeenCalled(); expect(state.timer).not.toBeNull(); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { + const persisted = (await loadCronStore(storePath)) as { jobs: CronJob[]; }; const job = persisted.jobs[0]; diff --git a/src/cron/service/timer.test.ts b/src/cron/service/timer.test.ts index 26033177247..d38a1dc754c 100644 --- a/src/cron/service/timer.test.ts +++ b/src/cron/service/timer.test.ts @@ -1,8 +1,8 @@ -import fs from "node:fs/promises"; import { afterEach, describe, expect, it, vi } from "vitest"; import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js"; import { createCronServiceState } from "../../cron/service/state.js"; import { onTimer } from "../../cron/service/timer.js"; +import { loadCronStore } from "../../cron/store.js"; import type { CronJob } from "../../cron/types.js"; import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js"; import { resetTaskRegistryForTests } from "../../tasks/task-registry.js"; @@ -68,9 +68,7 @@ describe("cron service timer seam coverage", () => { heartbeat: { target: "last" }, }); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { - jobs: CronJob[]; - }; + const persisted = await loadCronStore(storePath); const job = persisted.jobs[0]; expect(job).toBeDefined(); expect(job?.state.lastStatus).toBe("ok"); diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts index 90e04365396..dde5ca4b28e 100644 --- a/src/cron/store.test.ts +++ b/src/cron/store.test.ts @@ -1,12 +1,31 @@ import fs from "node:fs/promises"; +import os from "node:os"; import path from "node:path"; import { setTimeout as scheduleNativeTimeout } from "node:timers"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { createCronStoreHarness } from "./service.test-harness.js"; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { loadCronStore, resolveCronStorePath, saveCronStore } from "./store.js"; import type { CronStoreFile } from "./types.js"; -const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-store-" }); +let fixtureRoot = ""; +let caseId = 0; + +beforeAll(async () => { + fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-store-")); +}); + +afterAll(async () => { + if (fixtureRoot) { + await fs.rm(fixtureRoot, { recursive: true, force: true }); + } +}); + +async function makeStorePath() { + const dir = path.join(fixtureRoot, `case-${caseId++}`); + await fs.mkdir(dir, { recursive: true }); + return { + storePath: path.join(dir, "cron", "jobs.json"), + }; +} function makeStore(jobId: string, enabled: boolean): CronStoreFile { const now = Date.now(); @@ -109,8 +128,13 @@ describe("cron store", () => { const currentRaw = await fs.readFile(store.storePath, "utf-8"); const backupRaw = await fs.readFile(`${store.storePath}.bak`, "utf-8"); - expect(JSON.parse(currentRaw)).toEqual(second); - expect(JSON.parse(backupRaw)).toEqual(first); + const current = JSON.parse(currentRaw); + const backup = JSON.parse(backupRaw); + // jobs.json now contains config-only (state stripped to {}). + expect(current.jobs[0].id).toBe("job-2"); + expect(current.jobs[0].state).toEqual({}); + expect(backup.jobs[0].id).toBe("job-1"); + expect(backup.jobs[0].state).toEqual({}); }); it("skips backup files for runtime-only state churn", async () => { @@ -132,11 +156,220 @@ describe("cron store", () => { await saveCronStore(store.storePath, first); await saveCronStore(store.storePath, second); - const currentRaw = await fs.readFile(store.storePath, "utf-8"); - expect(JSON.parse(currentRaw)).toEqual(second); + // jobs.json should NOT be rewritten (only runtime changed). + const configRaw = await fs.readFile(store.storePath, "utf-8"); + const config = JSON.parse(configRaw); + expect(config.jobs[0].state).toEqual({}); + expect(config.jobs[0]).not.toHaveProperty("updatedAtMs"); + + // State file should contain runtime fields. + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const stateRaw = await fs.readFile(statePath, "utf-8"); + const stateFile = JSON.parse(stateRaw); + expect(stateFile.jobs[first.jobs[0].id].state.nextRunAtMs).toBe( + first.jobs[0].createdAtMs + 60_000, + ); + await expect(fs.stat(`${store.storePath}.bak`)).rejects.toThrow(); }); + it("keeps state separate for custom store paths without a json suffix", async () => { + const store = await makeStorePath(); + const storePath = store.storePath.replace(/\.json$/, ""); + const statePath = `${storePath}-state.json`; + const first = makeStore("job-1", true); + const second: CronStoreFile = { + ...first, + jobs: first.jobs.map((job) => ({ + ...job, + updatedAtMs: job.updatedAtMs + 60_000, + state: { + ...job.state, + nextRunAtMs: job.createdAtMs + 60_000, + }, + })), + }; + + await saveCronStore(storePath, first); + await saveCronStore(storePath, second); + + const config = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(Array.isArray(config.jobs)).toBe(true); + expect(config.jobs[0].id).toBe("job-1"); + expect(config.jobs[0].state).toEqual({}); + + const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8")); + expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000); + + const loaded = await loadCronStore(storePath); + expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000); + }); + + it("recreates a missing state sidecar without rewriting unchanged config", async () => { + const store = await makeStorePath(); + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const payload = makeStore("job-1", true); + payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 }; + + await saveCronStore(store.storePath, payload); + await loadCronStore(store.storePath); + const configRawBefore = await fs.readFile(store.storePath, "utf-8"); + await fs.rm(statePath); + + const renamedDestinations: string[] = []; + const origRename = fs.rename.bind(fs); + const spy = vi.spyOn(fs, "rename").mockImplementation(async (src, dest) => { + renamedDestinations.push(String(dest)); + return origRename(src, dest); + }); + + try { + await saveCronStore(store.storePath, payload); + } finally { + spy.mockRestore(); + } + + const configRawAfter = await fs.readFile(store.storePath, "utf-8"); + const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8")); + + expect(configRawAfter).toBe(configRawBefore); + expect(renamedDestinations).toContain(statePath); + expect(renamedDestinations).not.toContain(store.storePath); + expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000); + }); + + it("recreates a missing config file without rewriting unchanged state", async () => { + const store = await makeStorePath(); + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const payload = makeStore("job-1", true); + payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 }; + + await saveCronStore(store.storePath, payload); + await loadCronStore(store.storePath); + const stateRawBefore = await fs.readFile(statePath, "utf-8"); + await fs.rm(store.storePath); + + const renamedDestinations: string[] = []; + const origRename = fs.rename.bind(fs); + const spy = vi.spyOn(fs, "rename").mockImplementation(async (src, dest) => { + renamedDestinations.push(String(dest)); + return origRename(src, dest); + }); + + try { + await saveCronStore(store.storePath, payload); + } finally { + spy.mockRestore(); + } + + const config = JSON.parse(await fs.readFile(store.storePath, "utf-8")); + const stateRawAfter = await fs.readFile(statePath, "utf-8"); + + expect(config.jobs[0].id).toBe("job-1"); + expect(config.jobs[0].state).toEqual({}); + expect(stateRawAfter).toBe(stateRawBefore); + expect(renamedDestinations).toContain(store.storePath); + expect(renamedDestinations).not.toContain(statePath); + }); + + it("migrates legacy inline state into the state sidecar", async () => { + const store = await makeStorePath(); + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + const legacy = makeStore("job-1", true); + legacy.jobs[0].state = { + lastRunAtMs: legacy.jobs[0].createdAtMs + 30_000, + nextRunAtMs: legacy.jobs[0].createdAtMs + 60_000, + }; + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify(legacy, null, 2), "utf-8"); + + const loaded = await loadCronStore(store.storePath); + await saveCronStore(store.storePath, loaded); + + const config = JSON.parse(await fs.readFile(store.storePath, "utf-8")); + const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8")); + + expect(config.jobs[0]).not.toHaveProperty("updatedAtMs"); + expect(config.jobs[0].state).toEqual({}); + expect(stateFile.jobs["job-1"].updatedAtMs).toBe(legacy.jobs[0].updatedAtMs); + expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000); + }); + + it("treats a corrupt state sidecar as absent", async () => { + const store = await makeStorePath(); + const payload = makeStore("job-1", true); + payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 }; + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + + await saveCronStore(store.storePath, payload); + await fs.writeFile(statePath, "{ not json", "utf-8"); + + const loaded = await loadCronStore(store.storePath); + + expect(loaded.jobs[0]?.updatedAtMs).toBe(payload.jobs[0].createdAtMs); + expect(loaded.jobs[0]?.state).toEqual({}); + }); + + it("propagates unreadable state sidecar errors", async () => { + const store = await makeStorePath(); + const payload = makeStore("job-1", true); + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + + await saveCronStore(store.storePath, payload); + + const origReadFile = fs.readFile.bind(fs); + const spy = vi.spyOn(fs, "readFile").mockImplementation(async (filePath, options) => { + if (filePath === statePath) { + const err = new Error("permission denied") as NodeJS.ErrnoException; + err.code = "EACCES"; + throw err; + } + return origReadFile(filePath, options as never) as never; + }); + + try { + await expect(loadCronStore(store.storePath)).rejects.toThrow(/Failed to read cron state/); + } finally { + spy.mockRestore(); + } + }); + + it("sanitizes invalid updatedAtMs values from the state sidecar", async () => { + const store = await makeStorePath(); + const job = makeStore("job-1", true).jobs[0]; + const config = { + version: 1, + jobs: [{ ...job, state: {}, updatedAtMs: undefined }], + }; + const statePath = store.storePath.replace(/\.json$/, "-state.json"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify(config, null, 2), "utf-8"); + await fs.writeFile( + statePath, + JSON.stringify( + { + version: 1, + jobs: { + [job.id]: { + updatedAtMs: "invalid", + state: { nextRunAtMs: job.createdAtMs + 60_000 }, + }, + }, + }, + null, + 2, + ), + "utf-8", + ); + + const loaded = await loadCronStore(store.storePath); + + expect(loaded.jobs[0]?.updatedAtMs).toBe(job.createdAtMs); + expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(job.createdAtMs + 60_000); + }); + it.skipIf(process.platform === "win32")( "writes store and backup files with secure permissions", async () => { diff --git a/src/cron/store.ts b/src/cron/store.ts index d6fcaa337dd..ad8075dcf17 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -6,7 +6,22 @@ import { resolveConfigDir } from "../utils.js"; import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js"; import type { CronStoreFile } from "./types.js"; -const serializedStoreCache = new Map(); +type SerializedStoreCacheEntry = { + configJson?: string; + stateJson?: string; + needsSplitMigration: boolean; +}; + +const serializedStoreCache = new Map(); + +function getSerializedStoreCache(storePath: string): SerializedStoreCacheEntry { + let entry = serializedStoreCache.get(storePath); + if (!entry) { + entry = { needsSplitMigration: false }; + serializedStoreCache.set(storePath, entry); + } + return entry; +} function resolveDefaultCronDir(): string { return path.join(resolveConfigDir(), "cron"); @@ -16,51 +31,42 @@ function resolveDefaultCronStorePath(): string { return path.join(resolveDefaultCronDir(), "jobs.json"); } +function resolveStatePath(storePath: string): string { + if (storePath.endsWith(".json")) { + return storePath.replace(/\.json$/, "-state.json"); + } + return `${storePath}-state.json`; +} + +type CronStateFileEntry = { + updatedAtMs?: number; + state?: Record; +}; + +type CronStateFile = { + version: 1; + jobs: Record; +}; + function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown { return { version: store.version, jobs: store.jobs.map((job) => { const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job; - return rest; + return { ...rest, state: {} }; }), }; } -function parseCronStoreForBackupComparison(raw: string): CronStoreFile | null { - try { - const parsed = parseJsonWithJson5Fallback(raw); - if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { - return null; - } - const version = (parsed as { version?: unknown }).version; - const jobs = (parsed as { jobs?: unknown }).jobs; - if (version !== 1 || !Array.isArray(jobs)) { - return null; - } - return { - version: 1, - jobs: jobs.filter(Boolean) as CronStoreFile["jobs"], +function extractStateFile(store: CronStoreFile): CronStateFile { + const jobs: Record = {}; + for (const job of store.jobs) { + jobs[job.id] = { + updatedAtMs: job.updatedAtMs, + state: job.state ?? {}, }; - } catch { - return null; } -} - -function shouldSkipCronBackupForRuntimeOnlyChanges( - previousRaw: string | null, - nextStore: CronStoreFile, -): boolean { - if (previousRaw === null) { - return false; - } - const previous = parseCronStoreForBackupComparison(previousRaw); - if (!previous) { - return false; - } - return ( - JSON.stringify(stripRuntimeOnlyCronFields(previous)) === - JSON.stringify(stripRuntimeOnlyCronFields(nextStore)) - ); + return { version: 1, jobs }; } export function resolveCronStorePath(storePath?: string) { @@ -74,6 +80,71 @@ export function resolveCronStorePath(storePath?: string) { return resolveDefaultCronStorePath(); } +async function loadStateFile(statePath: string): Promise { + let raw: string; + try { + raw = await fs.promises.readFile(statePath, "utf-8"); + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return null; + } + throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, { + cause: err, + }); + } + + try { + const parsed = parseJsonWithJson5Fallback(raw); + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + const record = parsed as Record; + if (record.version !== 1 || typeof record.jobs !== "object" || record.jobs === null) { + return null; + } + return { version: 1, jobs: record.jobs as Record }; + } catch { + // Best-effort: if state file is corrupt, treat as absent. + return null; + } +} + +function hasInlineState(jobs: Array | null | undefined>): boolean { + return jobs.some( + (job) => + job != null && + job.state !== undefined && + typeof job.state === "object" && + job.state !== null && + Object.keys(job.state as Record).length > 0, + ); +} + +function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void { + if (!job.state || typeof job.state !== "object") { + job.state = {} as never; + } +} + +function backfillMissingRuntimeFields(job: CronStoreFile["jobs"][number]): void { + ensureJobStateObject(job); + if (typeof job.updatedAtMs !== "number") { + job.updatedAtMs = typeof job.createdAtMs === "number" ? job.createdAtMs : Date.now(); + } +} + +function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unknown): number { + if (typeof updatedAtMs === "number" && Number.isFinite(updatedAtMs)) { + return updatedAtMs; + } + if (typeof job.updatedAtMs === "number" && Number.isFinite(job.updatedAtMs)) { + return job.updatedAtMs; + } + return typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs) + ? job.createdAtMs + : Date.now(); +} + export async function loadCronStore(storePath: string): Promise { try { const raw = await fs.promises.readFile(storePath, "utf-8"); @@ -94,7 +165,45 @@ export async function loadCronStore(storePath: string): Promise { version: 1 as const, jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"], }; - serializedStoreCache.set(storePath, JSON.stringify(store, null, 2)); + + // Load state file and merge. + const statePath = resolveStatePath(storePath); + const stateFile = await loadStateFile(statePath); + const hasLegacyInlineState = + !stateFile && hasInlineState(jobs as unknown as Array>); + + if (stateFile) { + // State file exists: merge state by job ID. Inline state in jobs.json is ignored. + for (const job of store.jobs) { + const entry = stateFile.jobs[job.id]; + if (entry) { + job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs); + job.state = (entry.state ?? {}) as never; + } else { + backfillMissingRuntimeFields(job); + } + } + } else if (!hasLegacyInlineState) { + // No state file, no inline state: fresh clone or first run. + for (const job of store.jobs) { + backfillMissingRuntimeFields(job); + } + } + // else: migration mode — no state file but jobs.json has inline state. Use as-is. + + // Ensure every job has a state object (defensive). + for (const job of store.jobs) { + ensureJobStateObject(job); + } + + const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); + const stateJson = JSON.stringify(extractStateFile(store), null, 2); + serializedStoreCache.set(storePath, { + configJson, + stateJson, + needsSplitMigration: hasLegacyInlineState, + }); + return store; } catch (err) { if ((err as { code?: unknown })?.code === "ENOENT") { @@ -113,51 +222,81 @@ async function setSecureFileMode(filePath: string): Promise { await fs.promises.chmod(filePath, 0o600).catch(() => undefined); } +async function atomicWrite(filePath: string, content: string, dirMode = 0o700): Promise { + const dir = path.dirname(filePath); + await fs.promises.mkdir(dir, { recursive: true, mode: dirMode }); + await fs.promises.chmod(dir, dirMode).catch(() => undefined); + const tmp = `${filePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`; + await fs.promises.writeFile(tmp, content, { encoding: "utf-8", mode: 0o600 }); + await renameWithRetry(tmp, filePath); + await setSecureFileMode(filePath); +} + +async function serializedFileNeedsWrite( + filePath: string, + expectedJson: string, + contentChanged: boolean, +): Promise { + if (contentChanged) { + return true; + } + try { + const diskJson = await fs.promises.readFile(filePath, "utf-8"); + return diskJson !== expectedJson; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return true; + } + throw err; + } +} + export async function saveCronStore( storePath: string, store: CronStoreFile, opts?: SaveCronStoreOptions, ) { - const storeDir = path.dirname(storePath); - await fs.promises.mkdir(storeDir, { recursive: true, mode: 0o700 }); - await fs.promises.chmod(storeDir, 0o700).catch(() => undefined); - const json = JSON.stringify(store, null, 2); - const cached = serializedStoreCache.get(storePath); - if (cached === json) { + const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); + const stateFile = extractStateFile(store); + const stateJson = JSON.stringify(stateFile, null, 2); + + const statePath = resolveStatePath(storePath); + const cache = serializedStoreCache.get(storePath); + + const configChanged = cache?.configJson !== configJson; + const stateChanged = cache?.stateJson !== stateJson; + const migrating = cache?.needsSplitMigration === true; + const configNeedsWrite = await serializedFileNeedsWrite(storePath, configJson, configChanged); + const stateNeedsWrite = await serializedFileNeedsWrite(statePath, stateJson, stateChanged); + + if (!configNeedsWrite && !stateNeedsWrite && !migrating) { return; } - let previous: string | null = cached ?? null; - if (previous === null) { - try { - previous = await fs.promises.readFile(storePath, "utf-8"); - } catch (err) { - if ((err as { code?: unknown }).code !== "ENOENT") { - throw err; + const updatedCache = getSerializedStoreCache(storePath); + + // Write state first so migration never leaves stripped config without runtime state. + if (stateNeedsWrite || migrating) { + await atomicWrite(statePath, stateJson); + updatedCache.stateJson = stateJson; + } + + if (configNeedsWrite || migrating) { + // Determine backup need: only when config actually changed (not migration-only). + const skipBackup = opts?.skipBackup === true || !configChanged; + if (!skipBackup) { + try { + const backupPath = `${storePath}.bak`; + await fs.promises.copyFile(storePath, backupPath); + await setSecureFileMode(backupPath); + } catch { + // best-effort } } + await atomicWrite(storePath, configJson); + updatedCache.configJson = configJson; } - if (previous === json) { - serializedStoreCache.set(storePath, json); - return; - } - const skipBackup = - opts?.skipBackup === true || shouldSkipCronBackupForRuntimeOnlyChanges(previous, store); - const tmp = `${storePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`; - await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 }); - await setSecureFileMode(tmp); - if (previous !== null && !skipBackup) { - try { - const backupPath = `${storePath}.bak`; - await fs.promises.copyFile(storePath, backupPath); - await setSecureFileMode(backupPath); - } catch { - // best-effort - } - } - await renameWithRetry(tmp, storePath); - await setSecureFileMode(storePath); - serializedStoreCache.set(storePath, json); + updatedCache.needsSplitMigration = false; } const RENAME_MAX_RETRIES = 3;