mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 12:50:42 +00:00
cron: recover missing split state sidecar
This commit is contained in:
@@ -186,6 +186,63 @@ describe("cron store", () => {
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it("recreates a missing state sidecar without rewriting unchanged config", async () => {
|
||||
const store = await makeStorePath();
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const payload = makeStore("job-1", true);
|
||||
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
|
||||
|
||||
await saveCronStore(store.storePath, payload);
|
||||
await loadCronStore(store.storePath);
|
||||
const configRawBefore = await fs.readFile(store.storePath, "utf-8");
|
||||
await fs.rm(statePath);
|
||||
|
||||
const renamedDestinations: string[] = [];
|
||||
const origRename = fs.rename.bind(fs);
|
||||
const spy = vi.spyOn(fs, "rename").mockImplementation(async (src, dest) => {
|
||||
renamedDestinations.push(String(dest));
|
||||
return origRename(src, dest);
|
||||
});
|
||||
|
||||
try {
|
||||
await saveCronStore(store.storePath, payload);
|
||||
} finally {
|
||||
spy.mockRestore();
|
||||
}
|
||||
|
||||
const configRawAfter = await fs.readFile(store.storePath, "utf-8");
|
||||
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
|
||||
|
||||
expect(configRawAfter).toBe(configRawBefore);
|
||||
expect(renamedDestinations).toContain(statePath);
|
||||
expect(renamedDestinations).not.toContain(store.storePath);
|
||||
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it("migrates legacy inline state into the state sidecar", async () => {
|
||||
const store = await makeStorePath();
|
||||
const statePath = store.storePath.replace(/\.json$/, "-state.json");
|
||||
const legacy = makeStore("job-1", true);
|
||||
legacy.jobs[0].state = {
|
||||
lastRunAtMs: legacy.jobs[0].createdAtMs + 30_000,
|
||||
nextRunAtMs: legacy.jobs[0].createdAtMs + 60_000,
|
||||
};
|
||||
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, JSON.stringify(legacy, null, 2), "utf-8");
|
||||
|
||||
const loaded = await loadCronStore(store.storePath);
|
||||
await saveCronStore(store.storePath, loaded);
|
||||
|
||||
const config = JSON.parse(await fs.readFile(store.storePath, "utf-8"));
|
||||
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
|
||||
|
||||
expect(config.jobs[0]).not.toHaveProperty("updatedAtMs");
|
||||
expect(config.jobs[0].state).toEqual({});
|
||||
expect(stateFile.jobs["job-1"].updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
|
||||
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
|
||||
});
|
||||
|
||||
it("sanitizes invalid updatedAtMs values from the state sidecar", async () => {
|
||||
const store = await makeStorePath();
|
||||
const job = makeStore("job-1", true).jobs[0];
|
||||
|
||||
@@ -7,6 +7,7 @@ import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js";
|
||||
import type { CronStoreFile } from "./types.js";
|
||||
|
||||
const serializedStoreCache = new Map<string, string>();
|
||||
const storesNeedingSplitMigration = new Set<string>();
|
||||
|
||||
function resolveDefaultCronDir(): string {
|
||||
return path.join(resolveConfigDir(), "cron");
|
||||
@@ -146,6 +147,8 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
// Load state file and merge.
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const stateFile = await loadStateFile(statePath);
|
||||
const hasLegacyInlineState =
|
||||
!stateFile && hasInlineState(jobs as unknown as Array<Record<string, unknown>>);
|
||||
|
||||
if (stateFile) {
|
||||
// State file exists: merge state by job ID. Inline state in jobs.json is ignored.
|
||||
@@ -158,7 +161,7 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
}
|
||||
} else if (!hasInlineState(jobs as unknown as Array<Record<string, unknown>>)) {
|
||||
} else if (!hasLegacyInlineState) {
|
||||
// No state file, no inline state: fresh clone or first run.
|
||||
for (const job of store.jobs) {
|
||||
backfillMissingRuntimeFields(job);
|
||||
@@ -172,9 +175,13 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
}
|
||||
|
||||
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
|
||||
const stateJson = JSON.stringify(extractStateFile(store), null, 2);
|
||||
serializedStoreCache.set(storePath, configJson);
|
||||
if (stateFile) {
|
||||
serializedStoreCache.set(`${storePath}:state`, JSON.stringify(stateFile, null, 2));
|
||||
serializedStoreCache.set(`${storePath}:state`, stateJson);
|
||||
if (hasLegacyInlineState) {
|
||||
storesNeedingSplitMigration.add(storePath);
|
||||
} else {
|
||||
storesNeedingSplitMigration.delete(storePath);
|
||||
}
|
||||
|
||||
return store;
|
||||
@@ -182,6 +189,7 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
serializedStoreCache.delete(storePath);
|
||||
serializedStoreCache.delete(`${storePath}:state`);
|
||||
storesNeedingSplitMigration.delete(storePath);
|
||||
return { version: 1, jobs: [] };
|
||||
}
|
||||
throw err;
|
||||
@@ -223,23 +231,28 @@ export async function saveCronStore(
|
||||
|
||||
const configChanged = cachedConfig !== configJson;
|
||||
const stateChanged = cachedState !== stateJson;
|
||||
const migrating = storesNeedingSplitMigration.has(storePath);
|
||||
let stateNeedsWrite = stateChanged;
|
||||
|
||||
if (!configChanged && !stateChanged) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Detect migration: state file does not exist on disk yet.
|
||||
let migrating = false;
|
||||
if (!cachedState) {
|
||||
if (!stateNeedsWrite) {
|
||||
try {
|
||||
await fs.promises.access(statePath, fs.constants.F_OK);
|
||||
} catch {
|
||||
migrating = true;
|
||||
const diskStateJson = await fs.promises.readFile(statePath, "utf-8");
|
||||
stateNeedsWrite = diskStateJson !== stateJson;
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown })?.code === "ENOENT") {
|
||||
stateNeedsWrite = true;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!configChanged && !stateNeedsWrite && !migrating) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Write state first so migration never leaves stripped config without runtime state.
|
||||
if (stateChanged || migrating) {
|
||||
if (stateNeedsWrite || migrating) {
|
||||
await atomicWrite(statePath, stateJson);
|
||||
serializedStoreCache.set(stateCacheKey, stateJson);
|
||||
}
|
||||
@@ -259,6 +272,7 @@ export async function saveCronStore(
|
||||
await atomicWrite(storePath, configJson);
|
||||
serializedStoreCache.set(storePath, configJson);
|
||||
}
|
||||
storesNeedingSplitMigration.delete(storePath);
|
||||
}
|
||||
|
||||
const RENAME_MAX_RETRIES = 3;
|
||||
|
||||
Reference in New Issue
Block a user