mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
feat(cron): split jobs.json into config and runtime state files (#63105)
Merged via squash.
Prepared head SHA: 470bb2561f
Co-authored-by: Feelw00 <45638585+Feelw00@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -7,6 +7,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Changes
|
||||
|
||||
- Plugins/tests: reuse plugin loader alias and Jiti config resolution across repeated same-context loads, reducing import-heavy test overhead. (#69316) Thanks @amknight.
|
||||
- Cron: split runtime execution state into `jobs-state.json` so `jobs.json` stays stable for git-tracked job definitions. (#63105) Thanks @Feelw00.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -33,7 +33,9 @@ openclaw cron runs --id <job-id>
|
||||
## How cron works
|
||||
|
||||
- Cron runs **inside the Gateway** process (not inside the model).
|
||||
- Jobs persist at `~/.openclaw/cron/jobs.json` so restarts do not lose schedules.
|
||||
- Job definitions persist at `~/.openclaw/cron/jobs.json` so restarts do not lose schedules.
|
||||
- Runtime execution state persists next to it in `~/.openclaw/cron/jobs-state.json`. If you track cron definitions in git, track `jobs.json` and gitignore `jobs-state.json`.
|
||||
- After the split, older OpenClaw versions can read `jobs.json` but may treat jobs as fresh because runtime fields now live in `jobs-state.json`.
|
||||
- All cron executions create [background task](/automation/tasks) records.
|
||||
- One-shot jobs (`--at`) auto-delete after success by default.
|
||||
- Isolated cron runs best-effort close tracked browser tabs/processes for their `cron:<jobId>` session when the run completes, so detached browser automation does not leave orphaned processes behind.
|
||||
@@ -368,6 +370,10 @@ Model override note:
|
||||
}
|
||||
```
|
||||
|
||||
The runtime state sidecar is derived from `cron.store`: a `.json` store such as
|
||||
`~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path
|
||||
without a `.json` suffix appends `-state.json`.
|
||||
|
||||
Disable cron: `cron.enabled: false` or `OPENCLAW_SKIP_CRON=1`.
|
||||
|
||||
**One-shot retry**: transient errors (rate limit, overload, network, server error) retry up to 3 times with exponential backoff. Permanent errors disable immediately.
|
||||
|
||||
@@ -301,7 +301,7 @@ See [Task Flow](/automation/taskflow) for details.
|
||||
|
||||
### Tasks and cron
|
||||
|
||||
A cron job **definition** lives in `~/.openclaw/cron/jobs.json`. **Every** cron execution creates a task record — both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications.
|
||||
A cron job **definition** lives in `~/.openclaw/cron/jobs.json`; runtime execution state lives beside it in `~/.openclaw/cron/jobs-state.json`. **Every** cron execution creates a task record — both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications.
|
||||
|
||||
See [Cron Jobs](/automation/cron-jobs).
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
writeCronStoreSnapshot,
|
||||
} from "./service.issue-regressions.test-helpers.js";
|
||||
import { CronService } from "./service.js";
|
||||
import { loadCronStore } from "./store.js";
|
||||
import type { CronJob, CronJobState } from "./types.js";
|
||||
|
||||
describe("Cron issue regressions", () => {
|
||||
@@ -92,9 +93,7 @@ describe("Cron issue regressions", () => {
|
||||
expect(Number.isFinite(isolated?.state.nextRunAtMs)).toBe(true);
|
||||
expect(Number.isFinite(status.nextWakeAtMs)).toBe(true);
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||
jobs: Array<{ id: string; state?: { nextRunAtMs?: number | null } }>;
|
||||
};
|
||||
const persisted = await loadCronStore(store.storePath);
|
||||
const persistedIsolated = persisted.jobs.find((job) => job.id === "legacy-isolated");
|
||||
expect(typeof persistedIsolated?.state?.nextRunAtMs).toBe("number");
|
||||
expect(Number.isFinite(persistedIsolated?.state?.nextRunAtMs)).toBe(true);
|
||||
@@ -187,9 +186,7 @@ describe("Cron issue regressions", () => {
|
||||
payload: { kind: "systemEvent", text: "other-updated" },
|
||||
});
|
||||
|
||||
const storeData = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||
jobs: Array<{ id: string; state?: { nextRunAtMs?: number } }>;
|
||||
};
|
||||
const storeData = await loadCronStore(store.storePath);
|
||||
const persistedDueJob = storeData.jobs.find((job) => job.id === dueJob.id);
|
||||
expect(persistedDueJob?.state?.nextRunAtMs).toBe(originalDueNextRunAtMs);
|
||||
|
||||
@@ -300,9 +297,7 @@ describe("Cron issue regressions", () => {
|
||||
const result = await cron.run(job.id, "force");
|
||||
expect(result).toEqual({ ok: true, ran: true });
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
const persisted = await loadCronStore(store.storePath);
|
||||
const persistedJob = persisted.jobs.find((entry) => entry.id === job.id);
|
||||
expect(persistedJob?.delivery?.to).toBe(rewrittenTarget);
|
||||
expect(persistedJob?.state.lastStatus).toBe("ok");
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
|
||||
import { loadCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { run, start, stop, update } from "./ops.js";
|
||||
import { createCronServiceState } from "./state.js";
|
||||
@@ -102,7 +102,7 @@ async function expectDueIsolatedManualRunProgresses(storePath: string, now: numb
|
||||
|
||||
await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true });
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
const persisted = (await loadCronStore(storePath)) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined();
|
||||
@@ -161,7 +161,7 @@ describe("cron service ops seam coverage", () => {
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
expect(state.timer).not.toBeNull();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
const persisted = (await loadCronStore(storePath)) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
const job = persisted.jobs[0];
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js";
|
||||
import { createCronServiceState } from "../../cron/service/state.js";
|
||||
import { onTimer } from "../../cron/service/timer.js";
|
||||
import { loadCronStore } from "../../cron/store.js";
|
||||
import type { CronJob } from "../../cron/types.js";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
@@ -68,9 +68,7 @@ describe("cron service timer seam coverage", () => {
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const job = persisted.jobs[0];
|
||||
expect(job).toBeDefined();
|
||||
expect(job?.state.lastStatus).toBe("ok");
|
||||
|
||||
@@ -1,12 +1,31 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { setTimeout as scheduleNativeTimeout } from "node:timers";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createCronStoreHarness } from "./service.test-harness.js";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { loadCronStore, resolveCronStorePath, saveCronStore } from "./store.js";
|
||||
import type { CronStoreFile } from "./types.js";
|
||||
|
||||
const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-store-" });
|
||||
let fixtureRoot = "";
|
||||
let caseId = 0;
|
||||
|
||||
beforeAll(async () => {
|
||||
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-store-"));
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
if (fixtureRoot) {
|
||||
await fs.rm(fixtureRoot, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
async function makeStorePath() {
|
||||
const dir = path.join(fixtureRoot, `case-${caseId++}`);
|
||||
await fs.mkdir(dir, { recursive: true });
|
||||
return {
|
||||
storePath: path.join(dir, "cron", "jobs.json"),
|
||||
};
|
||||
}
|
||||
|
||||
function makeStore(jobId: string, enabled: boolean): CronStoreFile {
|
||||
const now = Date.now();
|
||||
@@ -109,8 +128,13 @@ describe("cron store", () => {
|
||||
|
||||
const currentRaw = await fs.readFile(store.storePath, "utf-8");
|
||||
const backupRaw = await fs.readFile(`${store.storePath}.bak`, "utf-8");
|
||||
expect(JSON.parse(currentRaw)).toEqual(second);
|
||||
expect(JSON.parse(backupRaw)).toEqual(first);
|
||||
const current = JSON.parse(currentRaw);
|
||||
const backup = JSON.parse(backupRaw);
|
||||
// jobs.json now contains config-only (state stripped to {}).
|
||||
expect(current.jobs[0].id).toBe("job-2");
|
||||
expect(current.jobs[0].state).toEqual({});
|
||||
expect(backup.jobs[0].id).toBe("job-1");
|
||||
expect(backup.jobs[0].state).toEqual({});
|
||||
});
|
||||
|
||||
it("skips backup files for runtime-only state churn", async () => {
|
||||
@@ -132,11 +156,220 @@ describe("cron store", () => {
|
||||
await saveCronStore(store.storePath, first);
|
||||
await saveCronStore(store.storePath, second);
|
||||
|
||||
const currentRaw = await fs.readFile(store.storePath, "utf-8");
|
||||
expect(JSON.parse(currentRaw)).toEqual(second);
|
||||
// jobs.json should NOT be rewritten (only runtime changed).
|
||||
const configRaw = await fs.readFile(store.storePath, "utf-8");
|
||||
const config = JSON.parse(configRaw);
|
||||
expect(config.jobs[0].state).toEqual({});
|
||||
expect(config.jobs[0]).not.toHaveProperty("updatedAtMs");
|
||||
|
||||
// State file should contain runtime fields.
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const stateRaw = await fs.readFile(statePath, "utf-8");
|
||||
const stateFile = JSON.parse(stateRaw);
|
||||
expect(stateFile.jobs[first.jobs[0].id].state.nextRunAtMs).toBe(
|
||||
first.jobs[0].createdAtMs + 60_000,
|
||||
);
|
||||
|
||||
await expect(fs.stat(`${store.storePath}.bak`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("keeps state separate for custom store paths without a json suffix", async () => {
|
||||
const store = await makeStorePath();
|
||||
const storePath = store.storePath.replace(/\.json$/, "");
|
||||
const statePath = `${storePath}-state.json`;
|
||||
const first = makeStore("job-1", true);
|
||||
const second: CronStoreFile = {
|
||||
...first,
|
||||
jobs: first.jobs.map((job) => ({
|
||||
...job,
|
||||
updatedAtMs: job.updatedAtMs + 60_000,
|
||||
state: {
|
||||
...job.state,
|
||||
nextRunAtMs: job.createdAtMs + 60_000,
|
||||
},
|
||||
})),
|
||||
};
|
||||
|
||||
await saveCronStore(storePath, first);
|
||||
await saveCronStore(storePath, second);
|
||||
|
||||
const config = JSON.parse(await fs.readFile(storePath, "utf-8"));
|
||||
expect(Array.isArray(config.jobs)).toBe(true);
|
||||
expect(config.jobs[0].id).toBe("job-1");
|
||||
expect(config.jobs[0].state).toEqual({});
|
||||
|
||||
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
|
||||
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000);
|
||||
|
||||
const loaded = await loadCronStore(storePath);
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it("recreates a missing state sidecar without rewriting unchanged config", async () => {
|
||||
const store = await makeStorePath();
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const payload = makeStore("job-1", true);
|
||||
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
|
||||
|
||||
await saveCronStore(store.storePath, payload);
|
||||
await loadCronStore(store.storePath);
|
||||
const configRawBefore = await fs.readFile(store.storePath, "utf-8");
|
||||
await fs.rm(statePath);
|
||||
|
||||
const renamedDestinations: string[] = [];
|
||||
const origRename = fs.rename.bind(fs);
|
||||
const spy = vi.spyOn(fs, "rename").mockImplementation(async (src, dest) => {
|
||||
renamedDestinations.push(String(dest));
|
||||
return origRename(src, dest);
|
||||
});
|
||||
|
||||
try {
|
||||
await saveCronStore(store.storePath, payload);
|
||||
} finally {
|
||||
spy.mockRestore();
|
||||
}
|
||||
|
||||
const configRawAfter = await fs.readFile(store.storePath, "utf-8");
|
||||
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
|
||||
|
||||
expect(configRawAfter).toBe(configRawBefore);
|
||||
expect(renamedDestinations).toContain(statePath);
|
||||
expect(renamedDestinations).not.toContain(store.storePath);
|
||||
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it("recreates a missing config file without rewriting unchanged state", async () => {
|
||||
const store = await makeStorePath();
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const payload = makeStore("job-1", true);
|
||||
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
|
||||
|
||||
await saveCronStore(store.storePath, payload);
|
||||
await loadCronStore(store.storePath);
|
||||
const stateRawBefore = await fs.readFile(statePath, "utf-8");
|
||||
await fs.rm(store.storePath);
|
||||
|
||||
const renamedDestinations: string[] = [];
|
||||
const origRename = fs.rename.bind(fs);
|
||||
const spy = vi.spyOn(fs, "rename").mockImplementation(async (src, dest) => {
|
||||
renamedDestinations.push(String(dest));
|
||||
return origRename(src, dest);
|
||||
});
|
||||
|
||||
try {
|
||||
await saveCronStore(store.storePath, payload);
|
||||
} finally {
|
||||
spy.mockRestore();
|
||||
}
|
||||
|
||||
const config = JSON.parse(await fs.readFile(store.storePath, "utf-8"));
|
||||
const stateRawAfter = await fs.readFile(statePath, "utf-8");
|
||||
|
||||
expect(config.jobs[0].id).toBe("job-1");
|
||||
expect(config.jobs[0].state).toEqual({});
|
||||
expect(stateRawAfter).toBe(stateRawBefore);
|
||||
expect(renamedDestinations).toContain(store.storePath);
|
||||
expect(renamedDestinations).not.toContain(statePath);
|
||||
});
|
||||
|
||||
it("migrates legacy inline state into the state sidecar", async () => {
|
||||
const store = await makeStorePath();
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const legacy = makeStore("job-1", true);
|
||||
legacy.jobs[0].state = {
|
||||
lastRunAtMs: legacy.jobs[0].createdAtMs + 30_000,
|
||||
nextRunAtMs: legacy.jobs[0].createdAtMs + 60_000,
|
||||
};
|
||||
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, JSON.stringify(legacy, null, 2), "utf-8");
|
||||
|
||||
const loaded = await loadCronStore(store.storePath);
|
||||
await saveCronStore(store.storePath, loaded);
|
||||
|
||||
const config = JSON.parse(await fs.readFile(store.storePath, "utf-8"));
|
||||
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
|
||||
|
||||
expect(config.jobs[0]).not.toHaveProperty("updatedAtMs");
|
||||
expect(config.jobs[0].state).toEqual({});
|
||||
expect(stateFile.jobs["job-1"].updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
|
||||
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it("treats a corrupt state sidecar as absent", async () => {
|
||||
const store = await makeStorePath();
|
||||
const payload = makeStore("job-1", true);
|
||||
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
|
||||
await saveCronStore(store.storePath, payload);
|
||||
await fs.writeFile(statePath, "{ not json", "utf-8");
|
||||
|
||||
const loaded = await loadCronStore(store.storePath);
|
||||
|
||||
expect(loaded.jobs[0]?.updatedAtMs).toBe(payload.jobs[0].createdAtMs);
|
||||
expect(loaded.jobs[0]?.state).toEqual({});
|
||||
});
|
||||
|
||||
it("propagates unreadable state sidecar errors", async () => {
|
||||
const store = await makeStorePath();
|
||||
const payload = makeStore("job-1", true);
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
|
||||
await saveCronStore(store.storePath, payload);
|
||||
|
||||
const origReadFile = fs.readFile.bind(fs);
|
||||
const spy = vi.spyOn(fs, "readFile").mockImplementation(async (filePath, options) => {
|
||||
if (filePath === statePath) {
|
||||
const err = new Error("permission denied") as NodeJS.ErrnoException;
|
||||
err.code = "EACCES";
|
||||
throw err;
|
||||
}
|
||||
return origReadFile(filePath, options as never) as never;
|
||||
});
|
||||
|
||||
try {
|
||||
await expect(loadCronStore(store.storePath)).rejects.toThrow(/Failed to read cron state/);
|
||||
} finally {
|
||||
spy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("sanitizes invalid updatedAtMs values from the state sidecar", async () => {
|
||||
const store = await makeStorePath();
|
||||
const job = makeStore("job-1", true).jobs[0];
|
||||
const config = {
|
||||
version: 1,
|
||||
jobs: [{ ...job, state: {}, updatedAtMs: undefined }],
|
||||
};
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, JSON.stringify(config, null, 2), "utf-8");
|
||||
await fs.writeFile(
|
||||
statePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: {
|
||||
[job.id]: {
|
||||
updatedAtMs: "invalid",
|
||||
state: { nextRunAtMs: job.createdAtMs + 60_000 },
|
||||
},
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const loaded = await loadCronStore(store.storePath);
|
||||
|
||||
expect(loaded.jobs[0]?.updatedAtMs).toBe(job.createdAtMs);
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(job.createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it.skipIf(process.platform === "win32")(
|
||||
"writes store and backup files with secure permissions",
|
||||
async () => {
|
||||
|
||||
@@ -6,7 +6,22 @@ import { resolveConfigDir } from "../utils.js";
|
||||
import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js";
|
||||
import type { CronStoreFile } from "./types.js";
|
||||
|
||||
const serializedStoreCache = new Map<string, string>();
|
||||
type SerializedStoreCacheEntry = {
|
||||
configJson?: string;
|
||||
stateJson?: string;
|
||||
needsSplitMigration: boolean;
|
||||
};
|
||||
|
||||
const serializedStoreCache = new Map<string, SerializedStoreCacheEntry>();
|
||||
|
||||
function getSerializedStoreCache(storePath: string): SerializedStoreCacheEntry {
|
||||
let entry = serializedStoreCache.get(storePath);
|
||||
if (!entry) {
|
||||
entry = { needsSplitMigration: false };
|
||||
serializedStoreCache.set(storePath, entry);
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
function resolveDefaultCronDir(): string {
|
||||
return path.join(resolveConfigDir(), "cron");
|
||||
@@ -16,51 +31,42 @@ function resolveDefaultCronStorePath(): string {
|
||||
return path.join(resolveDefaultCronDir(), "jobs.json");
|
||||
}
|
||||
|
||||
function resolveStatePath(storePath: string): string {
|
||||
if (storePath.endsWith(".json")) {
|
||||
return storePath.replace(/\.json$/, "-state.json");
|
||||
}
|
||||
return `${storePath}-state.json`;
|
||||
}
|
||||
|
||||
type CronStateFileEntry = {
|
||||
updatedAtMs?: number;
|
||||
state?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
type CronStateFile = {
|
||||
version: 1;
|
||||
jobs: Record<string, CronStateFileEntry>;
|
||||
};
|
||||
|
||||
function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown {
|
||||
return {
|
||||
version: store.version,
|
||||
jobs: store.jobs.map((job) => {
|
||||
const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job;
|
||||
return rest;
|
||||
return { ...rest, state: {} };
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
function parseCronStoreForBackupComparison(raw: string): CronStoreFile | null {
|
||||
try {
|
||||
const parsed = parseJsonWithJson5Fallback(raw);
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const version = (parsed as { version?: unknown }).version;
|
||||
const jobs = (parsed as { jobs?: unknown }).jobs;
|
||||
if (version !== 1 || !Array.isArray(jobs)) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
version: 1,
|
||||
jobs: jobs.filter(Boolean) as CronStoreFile["jobs"],
|
||||
function extractStateFile(store: CronStoreFile): CronStateFile {
|
||||
const jobs: Record<string, CronStateFileEntry> = {};
|
||||
for (const job of store.jobs) {
|
||||
jobs[job.id] = {
|
||||
updatedAtMs: job.updatedAtMs,
|
||||
state: job.state ?? {},
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function shouldSkipCronBackupForRuntimeOnlyChanges(
|
||||
previousRaw: string | null,
|
||||
nextStore: CronStoreFile,
|
||||
): boolean {
|
||||
if (previousRaw === null) {
|
||||
return false;
|
||||
}
|
||||
const previous = parseCronStoreForBackupComparison(previousRaw);
|
||||
if (!previous) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
JSON.stringify(stripRuntimeOnlyCronFields(previous)) ===
|
||||
JSON.stringify(stripRuntimeOnlyCronFields(nextStore))
|
||||
);
|
||||
return { version: 1, jobs };
|
||||
}
|
||||
|
||||
export function resolveCronStorePath(storePath?: string) {
|
||||
@@ -74,6 +80,71 @@ export function resolveCronStorePath(storePath?: string) {
|
||||
return resolveDefaultCronStorePath();
|
||||
}
|
||||
|
||||
async function loadStateFile(statePath: string): Promise<CronStateFile | null> {
|
||||
let raw: string;
|
||||
try {
|
||||
raw = await fs.promises.readFile(statePath, "utf-8");
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, {
|
||||
cause: err,
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = parseJsonWithJson5Fallback(raw);
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const record = parsed as Record<string, unknown>;
|
||||
if (record.version !== 1 || typeof record.jobs !== "object" || record.jobs === null) {
|
||||
return null;
|
||||
}
|
||||
return { version: 1, jobs: record.jobs as Record<string, CronStateFileEntry> };
|
||||
} catch {
|
||||
// Best-effort: if state file is corrupt, treat as absent.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function hasInlineState(jobs: Array<Record<string, unknown> | null | undefined>): boolean {
|
||||
return jobs.some(
|
||||
(job) =>
|
||||
job != null &&
|
||||
job.state !== undefined &&
|
||||
typeof job.state === "object" &&
|
||||
job.state !== null &&
|
||||
Object.keys(job.state as Record<string, unknown>).length > 0,
|
||||
);
|
||||
}
|
||||
|
||||
function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void {
|
||||
if (!job.state || typeof job.state !== "object") {
|
||||
job.state = {} as never;
|
||||
}
|
||||
}
|
||||
|
||||
function backfillMissingRuntimeFields(job: CronStoreFile["jobs"][number]): void {
|
||||
ensureJobStateObject(job);
|
||||
if (typeof job.updatedAtMs !== "number") {
|
||||
job.updatedAtMs = typeof job.createdAtMs === "number" ? job.createdAtMs : Date.now();
|
||||
}
|
||||
}
|
||||
|
||||
function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unknown): number {
|
||||
if (typeof updatedAtMs === "number" && Number.isFinite(updatedAtMs)) {
|
||||
return updatedAtMs;
|
||||
}
|
||||
if (typeof job.updatedAtMs === "number" && Number.isFinite(job.updatedAtMs)) {
|
||||
return job.updatedAtMs;
|
||||
}
|
||||
return typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs)
|
||||
? job.createdAtMs
|
||||
: Date.now();
|
||||
}
|
||||
|
||||
export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
try {
|
||||
const raw = await fs.promises.readFile(storePath, "utf-8");
|
||||
@@ -94,7 +165,45 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
version: 1 as const,
|
||||
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
|
||||
};
|
||||
serializedStoreCache.set(storePath, JSON.stringify(store, null, 2));
|
||||
|
||||
// Load state file and merge.
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const stateFile = await loadStateFile(statePath);
|
||||
const hasLegacyInlineState =
|
||||
!stateFile && hasInlineState(jobs as unknown as Array<Record<string, unknown>>);
|
||||
|
||||
if (stateFile) {
|
||||
// State file exists: merge state by job ID. Inline state in jobs.json is ignored.
|
||||
for (const job of store.jobs) {
|
||||
const entry = stateFile.jobs[job.id];
|
||||
if (entry) {
|
||||
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
|
||||
job.state = (entry.state ?? {}) as never;
|
||||
} else {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
}
|
||||
} else if (!hasLegacyInlineState) {
|
||||
// No state file, no inline state: fresh clone or first run.
|
||||
for (const job of store.jobs) {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
}
|
||||
// else: migration mode — no state file but jobs.json has inline state. Use as-is.
|
||||
|
||||
// Ensure every job has a state object (defensive).
|
||||
for (const job of store.jobs) {
|
||||
ensureJobStateObject(job);
|
||||
}
|
||||
|
||||
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
|
||||
const stateJson = JSON.stringify(extractStateFile(store), null, 2);
|
||||
serializedStoreCache.set(storePath, {
|
||||
configJson,
|
||||
stateJson,
|
||||
needsSplitMigration: hasLegacyInlineState,
|
||||
});
|
||||
|
||||
return store;
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
@@ -113,51 +222,81 @@ async function setSecureFileMode(filePath: string): Promise<void> {
|
||||
await fs.promises.chmod(filePath, 0o600).catch(() => undefined);
|
||||
}
|
||||
|
||||
async function atomicWrite(filePath: string, content: string, dirMode = 0o700): Promise<void> {
|
||||
const dir = path.dirname(filePath);
|
||||
await fs.promises.mkdir(dir, { recursive: true, mode: dirMode });
|
||||
await fs.promises.chmod(dir, dirMode).catch(() => undefined);
|
||||
const tmp = `${filePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`;
|
||||
await fs.promises.writeFile(tmp, content, { encoding: "utf-8", mode: 0o600 });
|
||||
await renameWithRetry(tmp, filePath);
|
||||
await setSecureFileMode(filePath);
|
||||
}
|
||||
|
||||
async function serializedFileNeedsWrite(
|
||||
filePath: string,
|
||||
expectedJson: string,
|
||||
contentChanged: boolean,
|
||||
): Promise<boolean> {
|
||||
if (contentChanged) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
const diskJson = await fs.promises.readFile(filePath, "utf-8");
|
||||
return diskJson !== expectedJson;
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
return true;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
export async function saveCronStore(
|
||||
storePath: string,
|
||||
store: CronStoreFile,
|
||||
opts?: SaveCronStoreOptions,
|
||||
) {
|
||||
const storeDir = path.dirname(storePath);
|
||||
await fs.promises.mkdir(storeDir, { recursive: true, mode: 0o700 });
|
||||
await fs.promises.chmod(storeDir, 0o700).catch(() => undefined);
|
||||
const json = JSON.stringify(store, null, 2);
|
||||
const cached = serializedStoreCache.get(storePath);
|
||||
if (cached === json) {
|
||||
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
|
||||
const stateFile = extractStateFile(store);
|
||||
const stateJson = JSON.stringify(stateFile, null, 2);
|
||||
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const cache = serializedStoreCache.get(storePath);
|
||||
|
||||
const configChanged = cache?.configJson !== configJson;
|
||||
const stateChanged = cache?.stateJson !== stateJson;
|
||||
const migrating = cache?.needsSplitMigration === true;
|
||||
const configNeedsWrite = await serializedFileNeedsWrite(storePath, configJson, configChanged);
|
||||
const stateNeedsWrite = await serializedFileNeedsWrite(statePath, stateJson, stateChanged);
|
||||
|
||||
if (!configNeedsWrite && !stateNeedsWrite && !migrating) {
|
||||
return;
|
||||
}
|
||||
|
||||
let previous: string | null = cached ?? null;
|
||||
if (previous === null) {
|
||||
try {
|
||||
previous = await fs.promises.readFile(storePath, "utf-8");
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown }).code !== "ENOENT") {
|
||||
throw err;
|
||||
const updatedCache = getSerializedStoreCache(storePath);
|
||||
|
||||
// Write state first so migration never leaves stripped config without runtime state.
|
||||
if (stateNeedsWrite || migrating) {
|
||||
await atomicWrite(statePath, stateJson);
|
||||
updatedCache.stateJson = stateJson;
|
||||
}
|
||||
|
||||
if (configNeedsWrite || migrating) {
|
||||
// Determine backup need: only when config actually changed (not migration-only).
|
||||
const skipBackup = opts?.skipBackup === true || !configChanged;
|
||||
if (!skipBackup) {
|
||||
try {
|
||||
const backupPath = `${storePath}.bak`;
|
||||
await fs.promises.copyFile(storePath, backupPath);
|
||||
await setSecureFileMode(backupPath);
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
await atomicWrite(storePath, configJson);
|
||||
updatedCache.configJson = configJson;
|
||||
}
|
||||
if (previous === json) {
|
||||
serializedStoreCache.set(storePath, json);
|
||||
return;
|
||||
}
|
||||
const skipBackup =
|
||||
opts?.skipBackup === true || shouldSkipCronBackupForRuntimeOnlyChanges(previous, store);
|
||||
const tmp = `${storePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`;
|
||||
await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 });
|
||||
await setSecureFileMode(tmp);
|
||||
if (previous !== null && !skipBackup) {
|
||||
try {
|
||||
const backupPath = `${storePath}.bak`;
|
||||
await fs.promises.copyFile(storePath, backupPath);
|
||||
await setSecureFileMode(backupPath);
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
await renameWithRetry(tmp, storePath);
|
||||
await setSecureFileMode(storePath);
|
||||
serializedStoreCache.set(storePath, json);
|
||||
updatedCache.needsSplitMigration = false;
|
||||
}
|
||||
|
||||
const RENAME_MAX_RETRIES = 3;
|
||||
|
||||
Reference in New Issue
Block a user