mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:40:43 +00:00
fix(cron): invalidate stale external schedule slots
This commit is contained in:
@@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Cron: resolve failure alerts and failure-destination announcements against `session:<id>` targets before falling back to the creator session, so jobs created from group chats can notify the targeted direct session without cross-account routing errors. Refs #62777; carries forward #68535. Thanks @slideshow-dingo and @likewen-tech.
|
||||
- Discord: preserve explicit `user:` and `channel:` delivery targets through plugin routing so cron announcements and failure alerts keep their intended recipient kind. Refs #62777; carries forward #62798. Thanks @neeravmakwana.
|
||||
- Cron: add `failureAlert.includeSkipped` and `openclaw cron edit --failure-alert-include-skipped` so persistently skipped jobs can alert without counting skips as execution errors or affecting retry backoff. Fixes #60846. Thanks @slideshow-dingo.
|
||||
- Cron: invalidate stale pending runtime slots after live or offline `jobs.json` schedule edits, while preserving due slots for formatting-only rewrites. Fixes #27996 and #71607; carries forward #71651. Thanks @xialonglee and @fagnersouza666.
|
||||
- Cron: classify isolated runs as errors from structured embedded-run execution-denial metadata, with final-output marker fallback for `SYSTEM_RUN_DENIED`, `INVALID_REQUEST`, and approval-binding refusals, so blocked commands no longer appear green in cron history. Fixes #67172; carries forward #67186. Thanks @oc-gh-dr, @hclsys, and @1yihui.
|
||||
- Onboarding/GitHub Copilot: add manifest-owned `--github-copilot-token` support for non-interactive setup, including env fallback, tokenRef storage in ref mode, saved-profile reuse, and current Copilot default-model wiring. Refs #50002 and supersedes #50003. Thanks @scottgl9.
|
||||
- Gateway/install: add a validated `--wrapper`/`OPENCLAW_WRAPPER` service install path that persists executable LaunchAgent/systemd wrappers across forced reinstalls, updates, and doctor repairs instead of falling back to raw node/bun `ProgramArguments`. Fixes #69400. (#72445) Thanks @willtmc.
|
||||
|
||||
@@ -43,6 +43,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
|
||||
- Job definitions persist at `~/.openclaw/cron/jobs.json` so restarts do not lose schedules.
|
||||
- Runtime execution state persists next to it in `~/.openclaw/cron/jobs-state.json`. If you track cron definitions in git, track `jobs.json` and gitignore `jobs-state.json`.
|
||||
- After the split, older OpenClaw versions can read `jobs.json` but may treat jobs as fresh because runtime fields now live in `jobs-state.json`.
|
||||
- When `jobs.json` is edited while the Gateway is running or stopped, OpenClaw compares the changed schedule fields with pending runtime slot metadata and clears stale `nextRunAtMs` values. Pure formatting or key-order-only rewrites preserve the pending slot.
|
||||
- All cron executions create [background task](/automation/tasks) records.
|
||||
- One-shot jobs (`--at`) auto-delete after success by default.
|
||||
- Isolated cron runs best-effort close tracked browser tabs/processes for their `cron:<jobId>` session when the run completes, so detached browser automation does not leave orphaned processes behind.
|
||||
@@ -399,6 +400,8 @@ Model override note:
|
||||
|
||||
The runtime state sidecar is derived from `cron.store`: a `.json` store such as `~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path without a `.json` suffix appends `-state.json`.
|
||||
|
||||
If you hand-edit `jobs.json`, leave `jobs-state.json` out of source control. OpenClaw uses that sidecar for pending slots, active markers, last-run metadata, and the schedule identity that tells the scheduler when an externally edited job needs a fresh `nextRunAtMs`.
|
||||
|
||||
Disable cron: `cron.enabled: false` or `OPENCLAW_SKIP_CRON=1`.
|
||||
|
||||
<AccordionGroup>
|
||||
|
||||
@@ -83,6 +83,8 @@ Recurring jobs use exponential retry backoff after consecutive errors: 30s, 1m,
|
||||
|
||||
Skipped runs are tracked separately from execution errors. They do not affect retry backoff, but `openclaw cron edit <job-id> --failure-alert-include-skipped` can opt failure alerts into repeated skipped-run notifications.
|
||||
|
||||
Note: cron job definitions live in `jobs.json`, while pending runtime state lives in `jobs-state.json`. If `jobs.json` is edited externally, the Gateway reloads changed schedules and clears stale pending slots; formatting-only rewrites do not clear the pending slot.
|
||||
|
||||
### Manual runs
|
||||
|
||||
`openclaw cron run` returns as soon as the manual run is queued. Successful responses include `{ ok: true, enqueued: true, runId }`. Use `openclaw cron runs --id <job-id>` to follow the eventual outcome.
|
||||
|
||||
40
src/cron/schedule-identity.ts
Normal file
40
src/cron/schedule-identity.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import type { CronJob, CronSchedule } from "./types.js";
|
||||
|
||||
function schedulePayload(
|
||||
schedule: CronSchedule,
|
||||
):
|
||||
| { kind: "at"; at: string }
|
||||
| { kind: "every"; everyMs: number; anchorMs?: number }
|
||||
| { kind: "cron"; expr: string; tz?: string; staggerMs?: number } {
|
||||
switch (schedule.kind) {
|
||||
case "at":
|
||||
return { kind: "at", at: schedule.at };
|
||||
case "every":
|
||||
return { kind: "every", everyMs: schedule.everyMs, anchorMs: schedule.anchorMs };
|
||||
case "cron":
|
||||
return {
|
||||
kind: "cron",
|
||||
expr: schedule.expr,
|
||||
tz: schedule.tz,
|
||||
staggerMs: schedule.staggerMs,
|
||||
};
|
||||
}
|
||||
throw new Error("Unsupported cron schedule kind");
|
||||
}
|
||||
|
||||
export function cronScheduleIdentity(
|
||||
job: Pick<CronJob, "schedule"> & { enabled?: boolean },
|
||||
): string {
|
||||
return JSON.stringify({
|
||||
version: 1,
|
||||
enabled: job.enabled ?? true,
|
||||
schedule: schedulePayload(job.schedule),
|
||||
});
|
||||
}
|
||||
|
||||
export function cronSchedulingInputsEqual(
|
||||
previous: Pick<CronJob, "schedule"> & { enabled?: boolean },
|
||||
next: Pick<CronJob, "schedule"> & { enabled?: boolean },
|
||||
): boolean {
|
||||
return cronScheduleIdentity(previous) === cronScheduleIdentity(next);
|
||||
}
|
||||
@@ -2,6 +2,8 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { setupCronServiceSuite } from "../service.test-harness.js";
|
||||
import { saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { findJobOrThrow } from "./jobs.js";
|
||||
import { createCronServiceState } from "./state.js";
|
||||
import { ensureLoaded, persist } from "./store.js";
|
||||
@@ -40,6 +42,22 @@ function createStoreTestState(storePath: string) {
|
||||
});
|
||||
}
|
||||
|
||||
function createReloadCronJob(params?: Partial<CronJob>): CronJob {
|
||||
return {
|
||||
id: "reload-cron-expr-job",
|
||||
name: "reload cron expr job",
|
||||
enabled: true,
|
||||
createdAtMs: STORE_TEST_NOW - 60_000,
|
||||
updatedAtMs: STORE_TEST_NOW - 60_000,
|
||||
schedule: { kind: "cron", expr: "0 6 * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
...params,
|
||||
};
|
||||
}
|
||||
|
||||
describe("cron service store seam coverage", () => {
|
||||
it("loads stored jobs, recomputes next runs, and does not rewrite the store on load", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
@@ -189,4 +207,183 @@ describe("cron service store seam coverage", () => {
|
||||
expect.stringContaining("invalid persisted sessionTarget"),
|
||||
);
|
||||
});
|
||||
|
||||
it("clears stale nextRunAtMs after force reload when cron schedule expression changes", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const staleNextRunAtMs = STORE_TEST_NOW + 3_600_000;
|
||||
|
||||
await saveCronStore(storePath, {
|
||||
version: 1,
|
||||
jobs: [
|
||||
createReloadCronJob({
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
],
|
||||
});
|
||||
|
||||
const state = createStoreTestState(storePath);
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
expect(findJobOrThrow(state, "reload-cron-expr-job").state.nextRunAtMs).toBe(staleNextRunAtMs);
|
||||
|
||||
await writeSingleJobStore(storePath, {
|
||||
id: "reload-cron-expr-job",
|
||||
name: "reload cron expr job",
|
||||
enabled: true,
|
||||
createdAtMs: STORE_TEST_NOW - 60_000,
|
||||
updatedAtMs: STORE_TEST_NOW - 30_000,
|
||||
schedule: { kind: "cron", expr: "30 6 * * 0,6", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
const reloadedJob = findJobOrThrow(state, "reload-cron-expr-job");
|
||||
expect(reloadedJob.schedule).toEqual({ kind: "cron", expr: "30 6 * * 0,6", tz: "UTC" });
|
||||
expect(reloadedJob.state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves nextRunAtMs after force reload when cron schedule key order changes only", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const dueNextRunAtMs = STORE_TEST_NOW - 1_000;
|
||||
|
||||
await saveCronStore(storePath, {
|
||||
version: 1,
|
||||
jobs: [
|
||||
createReloadCronJob({
|
||||
state: { nextRunAtMs: dueNextRunAtMs },
|
||||
}),
|
||||
],
|
||||
});
|
||||
|
||||
const state = createStoreTestState(storePath);
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
|
||||
await writeSingleJobStore(storePath, {
|
||||
id: "reload-cron-expr-job",
|
||||
name: "reload cron expr job",
|
||||
enabled: true,
|
||||
createdAtMs: STORE_TEST_NOW - 60_000,
|
||||
updatedAtMs: STORE_TEST_NOW - 30_000,
|
||||
schedule: { expr: "0 6 * * *", kind: "cron", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
expect(findJobOrThrow(state, "reload-cron-expr-job").state.nextRunAtMs).toBe(dueNextRunAtMs);
|
||||
});
|
||||
|
||||
it("preserves nextRunAtMs after force reload when scheduling inputs are unchanged", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const originalNextRunAtMs = STORE_TEST_NOW + 3_600_000;
|
||||
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({ state: { nextRunAtMs: originalNextRunAtMs } }),
|
||||
});
|
||||
|
||||
const state = createStoreTestState(storePath);
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
updatedAtMs: STORE_TEST_NOW,
|
||||
state: { nextRunAtMs: originalNextRunAtMs + 60_000 },
|
||||
}),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
expect(findJobOrThrow(state, "reload-cron-expr-job").state.nextRunAtMs).toBe(
|
||||
originalNextRunAtMs + 60_000,
|
||||
);
|
||||
});
|
||||
|
||||
it("clears stale nextRunAtMs after force reload when enabled state changes", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const staleNextRunAtMs = STORE_TEST_NOW + 3_600_000;
|
||||
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
enabled: true,
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
});
|
||||
|
||||
const state = createStoreTestState(storePath);
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
enabled: false,
|
||||
updatedAtMs: STORE_TEST_NOW,
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
expect(findJobOrThrow(state, "reload-cron-expr-job").state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("clears stale nextRunAtMs after force reload when every schedule anchor changes", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const jobId = "reload-every-anchor-job";
|
||||
const staleNextRunAtMs = STORE_TEST_NOW + 3_600_000;
|
||||
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
id: jobId,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: STORE_TEST_NOW - 60_000 },
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
});
|
||||
|
||||
const state = createStoreTestState(storePath);
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
id: jobId,
|
||||
updatedAtMs: STORE_TEST_NOW,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: STORE_TEST_NOW },
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
expect(findJobOrThrow(state, jobId).state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("clears stale nextRunAtMs after force reload when at schedule target changes", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const jobId = "reload-at-target-job";
|
||||
const staleNextRunAtMs = STORE_TEST_NOW + 3_600_000;
|
||||
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
id: jobId,
|
||||
schedule: { kind: "at", at: "2026-03-23T13:00:00.000Z" },
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
});
|
||||
|
||||
const state = createStoreTestState(storePath);
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
await writeSingleJobStore(storePath, {
|
||||
...createReloadCronJob({
|
||||
id: jobId,
|
||||
updatedAtMs: STORE_TEST_NOW,
|
||||
schedule: { kind: "at", at: "2026-03-23T14:00:00.000Z" },
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
}),
|
||||
});
|
||||
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
|
||||
expect(findJobOrThrow(state, jobId).state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,12 +1,25 @@
|
||||
import fs from "node:fs";
|
||||
import { normalizeCronJobIdentityFields } from "../normalize-job-identity.js";
|
||||
import { normalizeCronJobInput } from "../normalize.js";
|
||||
import { cronSchedulingInputsEqual } from "../schedule-identity.js";
|
||||
import { isInvalidCronSessionTargetIdError } from "../session-target.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
function invalidateStaleNextRunOnScheduleChange(params: {
|
||||
previousJobsById: ReadonlyMap<string, CronJob>;
|
||||
hydrated: CronJob;
|
||||
}) {
|
||||
const previousJob = params.previousJobsById.get(params.hydrated.id);
|
||||
if (!previousJob || cronSchedulingInputsEqual(previousJob, params.hydrated)) {
|
||||
return;
|
||||
}
|
||||
params.hydrated.state ??= {};
|
||||
params.hydrated.state.nextRunAtMs = undefined;
|
||||
}
|
||||
|
||||
async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||
try {
|
||||
const stats = await fs.promises.stat(path);
|
||||
@@ -30,6 +43,10 @@ export async function ensureLoaded(
|
||||
if (state.store && !opts?.forceReload) {
|
||||
return;
|
||||
}
|
||||
const previousJobsById = new Map<string, CronJob>();
|
||||
for (const job of state.store?.jobs ?? []) {
|
||||
previousJobsById.set(job.id, job);
|
||||
}
|
||||
// Force reload always re-reads the file to avoid missing cross-service
|
||||
// edits on filesystems with coarse mtime resolution.
|
||||
|
||||
@@ -67,6 +84,7 @@ export async function ensureLoaded(
|
||||
if (typeof hydrated.enabled !== "boolean") {
|
||||
hydrated.enabled = true;
|
||||
}
|
||||
invalidateStaleNextRunOnScheduleChange({ previousJobsById, hydrated });
|
||||
// Same shape: persisted jobs missing `sessionTarget` crash downstream
|
||||
// on any code path that dereferences `.startsWith` (e.g.
|
||||
// `runIsolatedAgentJob` in `src/gateway/server-cron.ts`). Mirror the
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js";
|
||||
import { createCronServiceState } from "../../cron/service/state.js";
|
||||
import { onTimer } from "../../cron/service/timer.js";
|
||||
import { loadCronStore } from "../../cron/store.js";
|
||||
import { loadCronStore, saveCronStore } from "../../cron/store.js";
|
||||
import type { CronJob } from "../../cron/types.js";
|
||||
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
|
||||
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
|
||||
@@ -130,4 +131,57 @@ describe("cron service timer seam coverage", () => {
|
||||
|
||||
createTaskRecordSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("reloads externally edited split-store schedules without firing stale slots", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.parse("2026-03-23T06:00:00.000Z");
|
||||
const staleNextRunAtMs = now;
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
|
||||
await saveCronStore(storePath, {
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "externally-edited-cron",
|
||||
name: "externally edited cron",
|
||||
enabled: true,
|
||||
createdAtMs: now - 60_000,
|
||||
updatedAtMs: now - 60_000,
|
||||
schedule: { kind: "cron", expr: "0 6 * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "stale schedule should not run" },
|
||||
state: { nextRunAtMs: staleNextRunAtMs },
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const config = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
config.jobs[0].schedule = { kind: "cron", expr: "0 7 * * *", tz: "UTC" };
|
||||
await fs.writeFile(storePath, JSON.stringify(config, null, 2), "utf8");
|
||||
|
||||
const state = createCronServiceState({
|
||||
storePath,
|
||||
cronEnabled: true,
|
||||
log: logger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await onTimer(state);
|
||||
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const job = persisted.jobs[0];
|
||||
expect(job?.schedule).toEqual({ kind: "cron", expr: "0 7 * * *", tz: "UTC" });
|
||||
expect(job?.state.lastStatus).toBeUndefined();
|
||||
expect(job?.state.nextRunAtMs).toBe(Date.parse("2026-03-23T07:00:00.000Z"));
|
||||
});
|
||||
});
|
||||
|
||||
@@ -199,10 +199,53 @@ describe("cron store", () => {
|
||||
expect(stateFile.jobs[first.jobs[0].id].state.nextRunAtMs).toBe(
|
||||
first.jobs[0].createdAtMs + 60_000,
|
||||
);
|
||||
expect(typeof stateFile.jobs[first.jobs[0].id].scheduleIdentity).toBe("string");
|
||||
|
||||
await expect(fs.stat(`${store.storePath}.bak`)).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("drops stale split runtime nextRunAtMs when schedule identity changes across restart", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const payload = makeStore("job-restart-drift", true);
|
||||
const staleNextRunAtMs = payload.jobs[0].createdAtMs + 3_600_000;
|
||||
payload.jobs[0].schedule = { kind: "cron", expr: "0 6 * * *", tz: "UTC" };
|
||||
payload.jobs[0].state = { nextRunAtMs: staleNextRunAtMs };
|
||||
|
||||
await saveCronStore(storePath, payload);
|
||||
|
||||
const config = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
config.jobs[0].schedule = { kind: "cron", expr: "30 6 * * 0,6", tz: "UTC" };
|
||||
await fs.writeFile(storePath, JSON.stringify(config, null, 2), "utf-8");
|
||||
|
||||
const loaded = await loadCronStore(storePath);
|
||||
|
||||
expect(loaded.jobs[0]?.schedule).toEqual({ kind: "cron", expr: "30 6 * * 0,6", tz: "UTC" });
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("drops stale split runtime nextRunAtMs in sync loads when schedule identity changes", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const payload = makeStore("job-sync-restart-drift", true);
|
||||
const staleNextRunAtMs = payload.jobs[0].createdAtMs + 3_600_000;
|
||||
payload.jobs[0].schedule = { kind: "every", everyMs: 60_000, anchorMs: 1 };
|
||||
payload.jobs[0].state = { nextRunAtMs: staleNextRunAtMs };
|
||||
|
||||
await saveCronStore(storePath, payload);
|
||||
|
||||
const config = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
|
||||
jobs: Array<Record<string, unknown>>;
|
||||
};
|
||||
config.jobs[0].schedule = { kind: "every", everyMs: 60_000, anchorMs: 2 };
|
||||
await fs.writeFile(storePath, JSON.stringify(config, null, 2), "utf-8");
|
||||
|
||||
const loaded = loadCronStoreSync(storePath);
|
||||
|
||||
expect(loaded.jobs[0]?.schedule).toEqual({ kind: "every", everyMs: 60_000, anchorMs: 2 });
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("keeps state separate for custom store paths without a json suffix", async () => {
|
||||
const store = await makeStorePath();
|
||||
const storePath = store.storePath.replace(/\.json$/, "");
|
||||
|
||||
@@ -4,6 +4,7 @@ import path from "node:path";
|
||||
import { expandHomePrefix } from "../infra/home-dir.js";
|
||||
import { resolveConfigDir } from "../utils.js";
|
||||
import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js";
|
||||
import { cronScheduleIdentity } from "./schedule-identity.js";
|
||||
import type { CronStoreFile } from "./types.js";
|
||||
|
||||
type SerializedStoreCacheEntry = {
|
||||
@@ -40,6 +41,7 @@ function resolveStatePath(storePath: string): string {
|
||||
|
||||
type CronStateFileEntry = {
|
||||
updatedAtMs?: number;
|
||||
scheduleIdentity?: string;
|
||||
state?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
@@ -63,6 +65,7 @@ function extractStateFile(store: CronStoreFile): CronStateFile {
|
||||
for (const job of store.jobs) {
|
||||
jobs[job.id] = {
|
||||
updatedAtMs: job.updatedAtMs,
|
||||
scheduleIdentity: cronScheduleIdentity(job),
|
||||
state: job.state ?? {},
|
||||
};
|
||||
}
|
||||
@@ -183,6 +186,18 @@ function resolveUpdatedAtMs(job: CronStoreFile["jobs"][number], updatedAtMs: unk
|
||||
: Date.now();
|
||||
}
|
||||
|
||||
function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: CronStateFileEntry): void {
|
||||
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
|
||||
job.state = (entry.state ?? {}) as never;
|
||||
if (
|
||||
typeof entry.scheduleIdentity === "string" &&
|
||||
entry.scheduleIdentity !== cronScheduleIdentity(job)
|
||||
) {
|
||||
ensureJobStateObject(job);
|
||||
job.state.nextRunAtMs = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
try {
|
||||
const raw = await fs.promises.readFile(storePath, "utf-8");
|
||||
@@ -215,8 +230,7 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
|
||||
for (const job of store.jobs) {
|
||||
const entry = stateFile.jobs[job.id];
|
||||
if (entry) {
|
||||
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
|
||||
job.state = (entry.state ?? {}) as never;
|
||||
mergeStateFileEntry(job, entry);
|
||||
} else {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
@@ -281,8 +295,7 @@ export function loadCronStoreSync(storePath: string): CronStoreFile {
|
||||
for (const job of store.jobs) {
|
||||
const entry = stateFile.jobs[job.id];
|
||||
if (entry) {
|
||||
job.updatedAtMs = resolveUpdatedAtMs(job, entry.updatedAtMs);
|
||||
job.state = (entry.state ?? {}) as never;
|
||||
mergeStateFileEntry(job, entry);
|
||||
} else {
|
||||
backfillMissingRuntimeFields(job);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user