fix(cron): preserve unsupported payload rows on writes

This commit is contained in:
IWhatsskill
2026-05-25 11:16:00 +02:00
committed by Peter Steinberger
parent 9330b76a51
commit c916906584
6 changed files with 244 additions and 14 deletions

View File

@@ -170,6 +170,29 @@ describe("normalizeStoredCronJobs", () => {
expect(result.jobs.map((job) => job.id)).toEqual(["valid"]);
});
it("does not normalize unsupported payload kinds into runnable cron jobs", () => {
const jobs = [
makeLegacyJob({
id: "legacy-command-kind",
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 },
payload: { kind: "command", command: "echo daily" },
}),
makeLegacyJob({
id: "legacy-agentmessage-kind",
schedule: { kind: "cron", expr: "0 9 * * *", tz: "UTC" },
sessionTarget: "isolated",
payload: { kind: "agentmessage", message: "summarize" },
}),
];
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues.invalidPayload).toBe(2);
expect(jobs).toEqual([]);
expect(result.jobs).toEqual([]);
});
it("normalizes whitespace-padded and non-canonical payload kinds", () => {
const jobs = [
{

View File

@@ -247,6 +247,7 @@ export function createMockCronStateForJobs(params: {
warnedDisabled: false,
warnedMissingSessionTargetJobIds: new Set<string>(),
warnedInvalidPersistedJobKeys: new Set<string>(),
preservedInvalidPersistedJobs: [],
deps: {
storePath: "/mock/path",
cronEnabled: true,

View File

@@ -1,6 +1,7 @@
import type { CronConfig } from "../../config/types.cron.js";
import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js";
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
import type { PreservedCronConfigJob } from "../store.js";
import type {
CronAgentExecutionPhaseUpdate,
CronAgentExecutionStarted,
@@ -168,6 +169,11 @@ export type CronServiceState = {
* until doctor/fix or an explicit config write repairs the store.
*/
warnedInvalidPersistedJobKeys: Set<string>;
/**
* Raw persisted config rows that are skipped for runtime safety but must not
* be deleted by routine cron-store writes.
*/
preservedInvalidPersistedJobs: PreservedCronConfigJob[];
storeLoadedAtMs: number | null;
storeFileMtimeMs: number | null;
};
@@ -182,6 +188,7 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState
warnedDisabled: false,
warnedMissingSessionTargetJobIds: new Set<string>(),
warnedInvalidPersistedJobKeys: new Set<string>(),
preservedInvalidPersistedJobs: [],
storeLoadedAtMs: null,
storeFileMtimeMs: null,
};

View File

@@ -15,13 +15,17 @@ const { logger, makeStorePath } = setupCronServiceSuite({
const STORE_TEST_NOW = Date.parse("2026-03-23T12:00:00.000Z");
async function writeSingleJobStore(storePath: string, job: Record<string, unknown>) {
await writeJobStore(storePath, [job]);
}
async function writeJobStore(storePath: string, jobs: Array<Record<string, unknown>>) {
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [job],
jobs,
},
null,
2,
@@ -130,6 +134,95 @@ describe("cron service store seam coverage", () => {
expect((state.storeFileMtimeMs ?? 0) >= (firstMtime ?? 0)).toBe(true);
});
it("preserves unsupported payload-kind rows across full persistence without loading them", async () => {
const { storePath } = await makeStorePath();
await writeJobStore(storePath, [
{
id: "valid-job",
name: "valid job",
enabled: true,
createdAtMs: STORE_TEST_NOW - 60_000,
updatedAtMs: STORE_TEST_NOW - 60_000,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "tick" },
state: {},
},
{
id: "legacy-command",
name: "legacy command",
enabled: true,
createdAtMs: STORE_TEST_NOW - 60_000,
updatedAtMs: STORE_TEST_NOW - 60_000,
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "command", command: "echo daily" },
state: { lastRunAtMs: STORE_TEST_NOW - 3_600_000 },
},
{
id: "legacy-agentmessage",
name: "legacy agentmessage",
enabled: true,
createdAtMs: STORE_TEST_NOW - 60_000,
schedule: { kind: "cron", expr: "0 9 * * *", tz: "UTC" },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentmessage", message: "summarize" },
metadata: { preserve: { nested: true } },
},
]);
const state = createStoreTestState(storePath);
await ensureLoaded(state, { skipRecompute: true });
expect(state.store?.jobs.map((job) => job.id)).toEqual(["valid-job"]);
expect(() => findJobOrThrow(state, "legacy-command")).toThrow(/unknown cron job id/);
expect(() => findJobOrThrow(state, "legacy-agentmessage")).toThrow(/unknown cron job id/);
const valid = findJobOrThrow(state, "valid-job");
valid.name = "valid job renamed";
await persist(state);
const config = JSON.parse(await fs.readFile(storePath, "utf8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(config.jobs.map((job) => job.id)).toEqual([
"valid-job",
"legacy-command",
"legacy-agentmessage",
]);
expect(config.jobs[0]?.name).toBe("valid job renamed");
expect(config.jobs[1]).toMatchObject({
id: "legacy-command",
payload: { kind: "command", command: "echo daily" },
state: { lastRunAtMs: STORE_TEST_NOW - 3_600_000 },
});
expect(config.jobs[2]).toMatchObject({
id: "legacy-agentmessage",
payload: { kind: "agentmessage", message: "summarize" },
metadata: { preserve: { nested: true } },
});
expect(config.jobs[2]).not.toHaveProperty("state");
expect(config.jobs[2]).not.toHaveProperty("updatedAtMs");
const stateFile = JSON.parse(
await fs.readFile(storePath.replace(/\.json$/, "-state.json"), "utf8"),
) as { jobs: Record<string, unknown> };
expect(Object.keys(stateFile.jobs)).toEqual(["valid-job"]);
const invalidPayloadWarns = logger.warn.mock.calls.filter((call) => {
const msg = typeof call[1] === "string" ? call[1] : "";
return msg.includes("skipped invalid persisted job");
});
expect(invalidPayloadWarns.map((call) => (call[0] as { jobId?: string }).jobId)).toEqual([
"legacy-command",
"legacy-agentmessage",
]);
});
it("normalizes jobId-only jobs in memory so scheduler lookups resolve by stable id", async () => {
const { storePath } = await makeStorePath();

View File

@@ -4,7 +4,11 @@ import { normalizeCronJobInput } from "../normalize.js";
import { getInvalidPersistedCronJobReason } from "../persisted-shape.js";
import { cronSchedulingInputsEqual } from "../schedule-identity.js";
import { isInvalidCronSessionTargetIdError } from "../session-target.js";
import { loadCronStore, saveCronStore } from "../store.js";
import {
loadCronStoreWithConfigJobs,
saveCronStore,
type PreservedCronConfigJob,
} from "../store.js";
import type { CronJob } from "../types.js";
import { recomputeNextRuns } from "./jobs.js";
import type { CronServiceState } from "./state.js";
@@ -44,6 +48,21 @@ function warnInvalidPersistedCronJob(params: {
);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === "object" && !Array.isArray(value);
}
function hasUnsupportedStringPayloadKind(candidate: Record<string, unknown>): boolean {
const payload = candidate.payload;
if (!isRecord(payload)) {
return false;
}
const kind = payload.kind;
return (
typeof kind === "string" && kind.trim() !== "" && kind !== "systemEvent" && kind !== "agentTurn"
);
}
async function getFileMtimeMs(path: string): Promise<number | null> {
try {
const stats = await fs.promises.stat(path);
@@ -75,11 +94,13 @@ export async function ensureLoaded(
// edits on filesystems with coarse mtime resolution.
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
const loaded = await loadCronStore(state.deps.storePath);
const loadedJobs = (loaded.jobs ?? []) as unknown as CronJob[];
const loaded = await loadCronStoreWithConfigJobs(state.deps.storePath);
const loadedJobs = (loaded.store.jobs ?? []) as unknown as CronJob[];
const jobs: CronJob[] = [];
const preservedInvalidPersistedJobs: PreservedCronConfigJob[] = [];
for (const [index, job] of loadedJobs.entries()) {
const raw = job as unknown as Record<string, unknown>;
const rawConfigJob = loaded.configJobs[index] ?? structuredClone(raw);
const { legacyJobIdIssue } = normalizeCronJobIdentityFields(raw);
let normalized: Record<string, unknown> | null;
try {
@@ -100,6 +121,9 @@ export async function ensureLoaded(
hydrated as unknown as Record<string, unknown>,
);
if (invalidReason) {
if (invalidReason === "invalid-payload" && hasUnsupportedStringPayloadKind(rawConfigJob)) {
preservedInvalidPersistedJobs.push({ index, job: rawConfigJob });
}
warnInvalidPersistedCronJob({ state, raw, index, reason: invalidReason });
continue;
}
@@ -159,6 +183,7 @@ export async function ensureLoaded(
version: 1,
jobs,
};
state.preservedInvalidPersistedJobs = preservedInvalidPersistedJobs;
state.storeLoadedAtMs = state.deps.nowMs();
state.storeFileMtimeMs = fileMtimeMs;
@@ -188,7 +213,10 @@ export async function persist(
if (!state.store) {
return;
}
await saveCronStore(state.deps.storePath, state.store, opts);
await saveCronStore(state.deps.storePath, state.store, {
...opts,
preservedConfigJobs: state.preservedInvalidPersistedJobs,
});
// Update file mtime after save to prevent immediate reload
state.storeFileMtimeMs = await getFileMtimeMs(state.deps.storePath);
}

View File

@@ -13,6 +13,16 @@ type SerializedStoreCacheEntry = {
needsSplitMigration: boolean;
};
export type PreservedCronConfigJob = {
index: number;
job: Record<string, unknown>;
};
export type LoadedCronStore = {
store: CronStoreFile;
configJobs: Array<Record<string, unknown>>;
};
const serializedStoreCache = new Map<string, SerializedStoreCacheEntry>();
function getSerializedStoreCache(storePath: string): SerializedStoreCacheEntry {
@@ -66,13 +76,71 @@ function normalizeCronStoreFile(parsed: unknown): CronStoreFile {
};
}
function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown {
function cloneConfigJobs(jobs: Array<Record<string, unknown>>): Array<Record<string, unknown>> {
return jobs.map((job) => structuredClone(job));
}
function stripJobRuntimeFields(job: CronStoreFile["jobs"][number]): Record<string, unknown> {
const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job;
return { ...rest, state: {} };
}
function persistedJobId(job: Record<string, unknown>): string | null {
const id = job.id;
return typeof id === "string" && id.trim() ? id : null;
}
function mergePreservedConfigJobs(
jobs: Array<Record<string, unknown>>,
preservedConfigJobs: PreservedCronConfigJob[] | undefined,
): Array<Record<string, unknown>> {
if (!preservedConfigJobs?.length) {
return jobs;
}
const occupiedIds = new Set<string>();
for (const job of jobs) {
const id = persistedJobId(job);
if (id) {
occupiedIds.add(id);
}
}
const seenPreservedIds = new Set<string>();
const preserved = preservedConfigJobs
.filter((entry) => {
const id = persistedJobId(entry.job);
if (!id) {
return true;
}
if (occupiedIds.has(id) || seenPreservedIds.has(id)) {
return false;
}
seenPreservedIds.add(id);
return true;
})
.toSorted((a, b) => a.index - b.index);
if (preserved.length === 0) {
return jobs;
}
const merged = jobs.slice();
for (const entry of preserved) {
const index = Math.max(0, Math.min(entry.index, merged.length));
merged.splice(index, 0, { ...entry.job });
}
return merged;
}
function stripRuntimeOnlyCronFields(
store: CronStoreFile,
preservedConfigJobs?: PreservedCronConfigJob[],
): unknown {
const jobs = store.jobs.map(stripJobRuntimeFields);
return {
version: store.version,
jobs: store.jobs.map((job) => {
const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job;
return { ...rest, state: {} };
}),
jobs: mergePreservedConfigJobs(jobs, preservedConfigJobs),
};
}
@@ -213,7 +281,7 @@ function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown)
}
}
export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
export async function loadCronStoreWithConfigJobs(storePath: string): Promise<LoadedCronStore> {
try {
const raw = await fs.promises.readFile(storePath, "utf-8");
let parsed: unknown;
@@ -226,6 +294,7 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
}
const store = normalizeCronStoreFile(parsed);
const jobs = store.jobs as unknown as Array<Record<string, unknown>>;
const configJobs = cloneConfigJobs(jobs);
// Load state file and merge.
const statePath = resolveStatePath(storePath);
@@ -263,16 +332,20 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
needsSplitMigration: hasLegacyInlineState,
});
return store;
return { store, configJobs };
} catch (err) {
if ((err as { code?: unknown })?.code === "ENOENT") {
serializedStoreCache.delete(storePath);
return { version: 1, jobs: [] };
return { store: { version: 1, jobs: [] }, configJobs: [] };
}
throw err;
}
}
export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
return (await loadCronStoreWithConfigJobs(storePath)).store;
}
export function loadCronStoreSync(storePath: string): CronStoreFile {
try {
const raw = fs.readFileSync(storePath, "utf-8");
@@ -321,6 +394,7 @@ export function loadCronStoreSync(storePath: string): CronStoreFile {
type SaveCronStoreOptions = {
skipBackup?: boolean;
stateOnly?: boolean;
preservedConfigJobs?: PreservedCronConfigJob[];
};
async function setSecureFileMode(filePath: string): Promise<void> {
@@ -364,7 +438,11 @@ export async function saveCronStore(
opts?: SaveCronStoreOptions,
) {
const stateOnly = opts?.stateOnly === true;
const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2);
const configJson = JSON.stringify(
stripRuntimeOnlyCronFields(store, opts?.preservedConfigJobs),
null,
2,
);
const stateFile = extractStateFile(store);
const stateJson = JSON.stringify(stateFile, null, 2);