mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 17:31:06 +00:00
feat(cron): split jobs.json into config and runtime state files
Separate cron store into jobs.json (config, git-trackable) and jobs-state.json (runtime state, gitignored) so that cron executions no longer produce meaningless diffs when tracking config in git. Closes #53581 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
committed by
Gustavo Madeira Santana
parent
33e63d914b
commit
ad996e1320
@@ -589,6 +589,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
|
||||
deleteAfterRun,
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
configUpdatedAtMs: now,
|
||||
schedule,
|
||||
sessionTarget: input.sessionTarget,
|
||||
wakeMode: input.wakeMode,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
|
||||
import { loadCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { run, start, stop, update } from "./ops.js";
|
||||
import { createCronServiceState } from "./state.js";
|
||||
@@ -102,7 +102,7 @@ async function expectDueIsolatedManualRunProgresses(storePath: string, now: numb
|
||||
|
||||
await expect(run(state, "isolated-timeout")).resolves.toEqual({ ok: true, ran: true });
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
const persisted = (await loadCronStore(storePath)) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
expect(persisted.jobs[0]?.state.runningAtMs).toBeUndefined();
|
||||
@@ -161,7 +161,7 @@ describe("cron service ops seam coverage", () => {
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
expect(state.timer).not.toBeNull();
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
const persisted = (await loadCronStore(storePath)) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
const job = persisted.jobs[0];
|
||||
|
||||
@@ -307,6 +307,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
|
||||
const enabledChanged = patch.enabled !== undefined;
|
||||
|
||||
job.updatedAtMs = now;
|
||||
job.configUpdatedAtMs = now;
|
||||
if (scheduleChanged || enabledChanged) {
|
||||
if (isJobEnabled(job)) {
|
||||
job.state.nextRunAtMs = computeJobNextRunAtMs(job, now);
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js";
|
||||
import { createCronServiceState } from "../../cron/service/state.js";
|
||||
import { onTimer } from "../../cron/service/timer.js";
|
||||
import { loadCronStore } from "../../cron/store.js";
|
||||
import type { CronJob } from "../../cron/types.js";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
@@ -68,9 +68,7 @@ describe("cron service timer seam coverage", () => {
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
jobs: CronJob[];
|
||||
};
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const job = persisted.jobs[0];
|
||||
expect(job).toBeDefined();
|
||||
expect(job?.state.lastStatus).toBe("ok");
|
||||
|
||||
@@ -109,8 +109,13 @@ describe("cron store", () => {
|
||||
|
||||
const currentRaw = await fs.readFile(store.storePath, "utf-8");
|
||||
const backupRaw = await fs.readFile(`${store.storePath}.bak`, "utf-8");
|
||||
expect(JSON.parse(currentRaw)).toEqual(second);
|
||||
expect(JSON.parse(backupRaw)).toEqual(first);
|
||||
const current = JSON.parse(currentRaw);
|
||||
const backup = JSON.parse(backupRaw);
|
||||
// jobs.json now contains config-only (state stripped to {}).
|
||||
expect(current.jobs[0].id).toBe("job-2");
|
||||
expect(current.jobs[0].state).toEqual({});
|
||||
expect(backup.jobs[0].id).toBe("job-1");
|
||||
expect(backup.jobs[0].state).toEqual({});
|
||||
});
|
||||
|
||||
it("skips backup files for runtime-only state churn", async () => {
|
||||
@@ -132,8 +137,20 @@ describe("cron store", () => {
|
||||
await saveCronStore(store.storePath, first);
|
||||
await saveCronStore(store.storePath, second);
|
||||
|
||||
const currentRaw = await fs.readFile(store.storePath, "utf-8");
|
||||
expect(JSON.parse(currentRaw)).toEqual(second);
|
||||
// jobs.json should NOT be rewritten (only runtime changed).
|
||||
const configRaw = await fs.readFile(store.storePath, "utf-8");
|
||||
const config = JSON.parse(configRaw);
|
||||
expect(config.jobs[0].state).toEqual({});
|
||||
expect(config.jobs[0]).not.toHaveProperty("updatedAtMs");
|
||||
|
||||
// State file should contain runtime fields.
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const stateRaw = await fs.readFile(statePath, "utf-8");
|
||||
const stateFile = JSON.parse(stateRaw);
|
||||
expect(stateFile.jobs[first.jobs[0].id].state.nextRunAtMs).toBe(
|
||||
first.jobs[0].createdAtMs + 60_000,
|
||||
);
|
||||
|
||||
await expect(fs.stat(`${store.storePath}.bak`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
|
||||
@@ -16,51 +16,39 @@ function resolveDefaultCronStorePath(): string {
|
||||
return path.join(resolveDefaultCronDir(), "jobs.json");
|
||||
}
|
||||
|
||||
function resolveStatePath(storePath: string): string {
|
||||
return storePath.replace(/\.json$/, "-state.json");
|
||||
}
|
||||
|
||||
type CronStateFileEntry = {
|
||||
updatedAtMs?: number;
|
||||
state?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
type CronStateFile = {
|
||||
version: 1;
|
||||
jobs: Record<string, CronStateFileEntry>;
|
||||
};
|
||||
|
||||
function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown {
|
||||
return {
|
||||
version: store.version,
|
||||
jobs: store.jobs.map((job) => {
|
||||
const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job;
|
||||
return rest;
|
||||
return { ...rest, state: {} };
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
function parseCronStoreForBackupComparison(raw: string): CronStoreFile | null {
|
||||
try {
|
||||
const parsed = parseJsonWithJson5Fallback(raw);
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const version = (parsed as { version?: unknown }).version;
|
||||
const jobs = (parsed as { jobs?: unknown }).jobs;
|
||||
if (version !== 1 || !Array.isArray(jobs)) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
version: 1,
|
||||
jobs: jobs.filter(Boolean) as CronStoreFile["jobs"],
|
||||
function extractStateFile(store: CronStoreFile): CronStateFile {
|
||||
const jobs: Record<string, CronStateFileEntry> = {};
|
||||
for (const job of store.jobs) {
|
||||
jobs[job.id] = {
|
||||
updatedAtMs: job.updatedAtMs,
|
||||
state: job.state ?? {},
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function shouldSkipCronBackupForRuntimeOnlyChanges(
|
||||
previousRaw: string | null,
|
||||
nextStore: CronStoreFile,
|
||||
): boolean {
|
||||
if (previousRaw === null) {
|
||||
return false;
|
||||
}
|
||||
const previous = parseCronStoreForBackupComparison(previousRaw);
|
||||
if (!previous) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
JSON.stringify(stripRuntimeOnlyCronFields(previous)) ===
|
||||
JSON.stringify(stripRuntimeOnlyCronFields(nextStore))
|
||||
);
|
||||
return { version: 1, jobs };
|
||||
}
|
||||
|
||||
export function resolveCronStorePath(storePath?: string) {
|
||||
@@ -74,6 +62,37 @@ export function resolveCronStorePath(storePath?: string) {
|
||||
return resolveDefaultCronStorePath();
|
||||
}
|
||||
|
||||
async function loadStateFile(statePath: string): Promise<CronStateFile | null> {
|
||||
try {
|
||||
const raw = await fs.promises.readFile(statePath, "utf-8");
|
||||
const parsed = parseJsonWithJson5Fallback(raw);
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const record = parsed as Record<string, unknown>;
|
||||
if (record.version !== 1 || typeof record.jobs !== "object" || record.jobs === null) {
|
||||
return null;
|
||||
}
|
||||
return { version: 1, jobs: record.jobs as Record<string, CronStateFileEntry> };
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
// Best-effort: if state file is corrupt, treat as absent.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function hasInlineState(jobs: Array<Record<string, unknown>>): boolean {
|
||||
return jobs.some(
|
||||
(job) =>
|
||||
job.state !== undefined &&
|
||||
typeof job.state === "object" &&
|
||||
job.state !== null &&
|
||||
Object.keys(job.state as Record<string, unknown>).length > 0,
|
||||
);
|
||||
}
|
||||
|
||||
export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
try {
|
||||
const raw = await fs.promises.readFile(storePath, "utf-8");
|
||||
@@ -94,11 +113,51 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
version: 1 as const,
|
||||
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
|
||||
};
|
||||
serializedStoreCache.set(storePath, JSON.stringify(store, null, 2));
|
||||
|
||||
// Load state file and merge.
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const stateFile = await loadStateFile(statePath);
|
||||
|
||||
if (stateFile) {
|
||||
// State file exists: merge state by job ID. Inline state in jobs.json is ignored.
|
||||
for (const job of store.jobs) {
|
||||
const entry = stateFile.jobs[job.id];
|
||||
if (entry) {
|
||||
job.updatedAtMs = entry.updatedAtMs ?? job.updatedAtMs;
|
||||
job.state = (entry.state ?? {}) as never;
|
||||
} else {
|
||||
// Job exists in config but not in state file: default to empty state.
|
||||
if (!job.state || typeof job.state !== "object") {
|
||||
job.state = {} as never;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (!hasInlineState(jobs as unknown as Array<Record<string, unknown>>)) {
|
||||
// No state file, no inline state: fresh clone or first run.
|
||||
for (const job of store.jobs) {
|
||||
job.state = (job.state && typeof job.state === "object" ? job.state : {}) as never;
|
||||
}
|
||||
}
|
||||
// else: migration mode — no state file but jobs.json has inline state. Use as-is.
|
||||
|
||||
// Ensure every job has a state object (defensive).
|
||||
for (const job of store.jobs) {
|
||||
if (!job.state || typeof job.state !== "object") {
|
||||
job.state = {} as never;
|
||||
}
|
||||
}
|
||||
|
||||
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
|
||||
serializedStoreCache.set(storePath, configJson);
|
||||
if (stateFile) {
|
||||
serializedStoreCache.set(`${storePath}:state`, JSON.stringify(stateFile, null, 2));
|
||||
}
|
||||
|
||||
return store;
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
serializedStoreCache.delete(storePath);
|
||||
serializedStoreCache.delete(`${storePath}:state`);
|
||||
return { version: 1, jobs: [] };
|
||||
}
|
||||
throw err;
|
||||
@@ -113,51 +172,71 @@ async function setSecureFileMode(filePath: string): Promise<void> {
|
||||
await fs.promises.chmod(filePath, 0o600).catch(() => undefined);
|
||||
}
|
||||
|
||||
async function atomicWrite(filePath: string, content: string, dirMode = 0o700): Promise<void> {
|
||||
const dir = path.dirname(filePath);
|
||||
await fs.promises.mkdir(dir, { recursive: true, mode: dirMode });
|
||||
await fs.promises.chmod(dir, dirMode).catch(() => undefined);
|
||||
const tmp = `${filePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`;
|
||||
await fs.promises.writeFile(tmp, content, { encoding: "utf-8", mode: 0o600 });
|
||||
await setSecureFileMode(tmp);
|
||||
await renameWithRetry(tmp, filePath);
|
||||
await setSecureFileMode(filePath);
|
||||
}
|
||||
|
||||
export async function saveCronStore(
|
||||
storePath: string,
|
||||
store: CronStoreFile,
|
||||
opts?: SaveCronStoreOptions,
|
||||
) {
|
||||
const storeDir = path.dirname(storePath);
|
||||
await fs.promises.mkdir(storeDir, { recursive: true, mode: 0o700 });
|
||||
await fs.promises.chmod(storeDir, 0o700).catch(() => undefined);
|
||||
const json = JSON.stringify(store, null, 2);
|
||||
const cached = serializedStoreCache.get(storePath);
|
||||
if (cached === json) {
|
||||
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
|
||||
const stateFile = extractStateFile(store);
|
||||
const stateJson = JSON.stringify(stateFile, null, 2);
|
||||
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const configCacheKey = storePath;
|
||||
const stateCacheKey = `${storePath}:state`;
|
||||
|
||||
const cachedConfig = serializedStoreCache.get(configCacheKey);
|
||||
const cachedState = serializedStoreCache.get(stateCacheKey);
|
||||
|
||||
const configChanged = cachedConfig !== configJson;
|
||||
const stateChanged = cachedState !== stateJson;
|
||||
|
||||
if (!configChanged && !stateChanged) {
|
||||
return;
|
||||
}
|
||||
|
||||
let previous: string | null = cached ?? null;
|
||||
if (previous === null) {
|
||||
// Detect migration: state file does not exist on disk yet.
|
||||
let migrating = false;
|
||||
if (!cachedState) {
|
||||
try {
|
||||
previous = await fs.promises.readFile(storePath, "utf-8");
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown }).code !== "ENOENT") {
|
||||
throw err;
|
||||
await fs.promises.access(statePath, fs.constants.F_OK);
|
||||
} catch {
|
||||
migrating = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Write state file first (safer ordering for migration — see PR_DRAFT.md Atomicity).
|
||||
if (stateChanged || migrating) {
|
||||
await atomicWrite(statePath, stateJson);
|
||||
serializedStoreCache.set(stateCacheKey, stateJson);
|
||||
}
|
||||
|
||||
if (configChanged || migrating) {
|
||||
// Determine backup need: only when config actually changed (not migration-only).
|
||||
const skipBackup = opts?.skipBackup === true || !configChanged;
|
||||
if (!skipBackup) {
|
||||
try {
|
||||
const backupPath = `${storePath}.bak`;
|
||||
await fs.promises.copyFile(storePath, backupPath);
|
||||
await setSecureFileMode(backupPath);
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
await atomicWrite(storePath, configJson);
|
||||
serializedStoreCache.set(configCacheKey, configJson);
|
||||
}
|
||||
if (previous === json) {
|
||||
serializedStoreCache.set(storePath, json);
|
||||
return;
|
||||
}
|
||||
const skipBackup =
|
||||
opts?.skipBackup === true || shouldSkipCronBackupForRuntimeOnlyChanges(previous, store);
|
||||
const tmp = `${storePath}.${process.pid}.${randomBytes(8).toString("hex")}.tmp`;
|
||||
await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 });
|
||||
await setSecureFileMode(tmp);
|
||||
if (previous !== null && !skipBackup) {
|
||||
try {
|
||||
const backupPath = `${storePath}.bak`;
|
||||
await fs.promises.copyFile(storePath, backupPath);
|
||||
await setSecureFileMode(backupPath);
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
await renameWithRetry(tmp, storePath);
|
||||
await setSecureFileMode(storePath);
|
||||
serializedStoreCache.set(storePath, json);
|
||||
}
|
||||
|
||||
const RENAME_MAX_RETRIES = 3;
|
||||
|
||||
@@ -9,6 +9,7 @@ export type CronJobBase<TSchedule, TSessionTarget, TWakeMode, TPayload, TDeliver
|
||||
deleteAfterRun?: boolean;
|
||||
createdAtMs: number;
|
||||
updatedAtMs: number;
|
||||
configUpdatedAtMs?: number;
|
||||
schedule: TSchedule;
|
||||
sessionTarget: TSessionTarget;
|
||||
wakeMode: TWakeMode;
|
||||
|
||||
Reference in New Issue
Block a user