diff --git a/src/commands/doctor-cron-store-migration.test.ts b/src/commands/doctor-cron-store-migration.test.ts index cfa83962484..422830bdb7b 100644 --- a/src/commands/doctor-cron-store-migration.test.ts +++ b/src/commands/doctor-cron-store-migration.test.ts @@ -170,6 +170,29 @@ describe("normalizeStoredCronJobs", () => { expect(result.jobs.map((job) => job.id)).toEqual(["valid"]); }); + it("does not normalize unsupported payload kinds into runnable cron jobs", () => { + const jobs = [ + makeLegacyJob({ + id: "legacy-command-kind", + schedule: { kind: "every", everyMs: 60_000, anchorMs: 1 }, + payload: { kind: "command", command: "echo daily" }, + }), + makeLegacyJob({ + id: "legacy-agentmessage-kind", + schedule: { kind: "cron", expr: "0 9 * * *", tz: "UTC" }, + sessionTarget: "isolated", + payload: { kind: "agentmessage", message: "summarize" }, + }), + ]; + + const result = normalizeStoredCronJobs(jobs); + + expect(result.mutated).toBe(true); + expect(result.issues.invalidPayload).toBe(2); + expect(jobs).toEqual([]); + expect(result.jobs).toEqual([]); + }); + it("normalizes whitespace-padded and non-canonical payload kinds", () => { const jobs = [ { diff --git a/src/cron/service.test-harness.ts b/src/cron/service.test-harness.ts index c5d411a83f5..41049550459 100644 --- a/src/cron/service.test-harness.ts +++ b/src/cron/service.test-harness.ts @@ -247,6 +247,7 @@ export function createMockCronStateForJobs(params: { warnedDisabled: false, warnedMissingSessionTargetJobIds: new Set(), warnedInvalidPersistedJobKeys: new Set(), + preservedInvalidPersistedJobs: [], deps: { storePath: "/mock/path", cronEnabled: true, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 74fa34af8ad..f5b27022334 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -1,6 +1,7 @@ import type { CronConfig } from "../../config/types.cron.js"; import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js"; import type { DeliveryContext } from "../../utils/delivery-context.types.js"; +import type { PreservedCronConfigJob } from "../store.js"; import type { CronAgentExecutionPhaseUpdate, CronAgentExecutionStarted, @@ -168,6 +169,11 @@ export type CronServiceState = { * until doctor/fix or an explicit config write repairs the store. */ warnedInvalidPersistedJobKeys: Set; + /** + * Raw persisted config rows that are skipped for runtime safety but must not + * be deleted by routine cron-store writes. + */ + preservedInvalidPersistedJobs: PreservedCronConfigJob[]; storeLoadedAtMs: number | null; storeFileMtimeMs: number | null; }; @@ -182,6 +188,7 @@ export function createCronServiceState(deps: CronServiceDeps): CronServiceState warnedDisabled: false, warnedMissingSessionTargetJobIds: new Set(), warnedInvalidPersistedJobKeys: new Set(), + preservedInvalidPersistedJobs: [], storeLoadedAtMs: null, storeFileMtimeMs: null, }; diff --git a/src/cron/service/store.test.ts b/src/cron/service/store.test.ts index d6273c27ee5..10c89228de5 100644 --- a/src/cron/service/store.test.ts +++ b/src/cron/service/store.test.ts @@ -15,13 +15,17 @@ const { logger, makeStorePath } = setupCronServiceSuite({ const STORE_TEST_NOW = Date.parse("2026-03-23T12:00:00.000Z"); async function writeSingleJobStore(storePath: string, job: Record) { + await writeJobStore(storePath, [job]); +} + +async function writeJobStore(storePath: string, jobs: Array>) { await fs.mkdir(path.dirname(storePath), { recursive: true }); await fs.writeFile( storePath, JSON.stringify( { version: 1, - jobs: [job], + jobs, }, null, 2, @@ -130,6 +134,95 @@ describe("cron service store seam coverage", () => { expect((state.storeFileMtimeMs ?? 0) >= (firstMtime ?? 0)).toBe(true); }); + it("preserves unsupported payload-kind rows across full persistence without loading them", async () => { + const { storePath } = await makeStorePath(); + + await writeJobStore(storePath, [ + { + id: "valid-job", + name: "valid job", + enabled: true, + createdAtMs: STORE_TEST_NOW - 60_000, + updatedAtMs: STORE_TEST_NOW - 60_000, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }, + { + id: "legacy-command", + name: "legacy command", + enabled: true, + createdAtMs: STORE_TEST_NOW - 60_000, + updatedAtMs: STORE_TEST_NOW - 60_000, + schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "command", command: "echo daily" }, + state: { lastRunAtMs: STORE_TEST_NOW - 3_600_000 }, + }, + { + id: "legacy-agentmessage", + name: "legacy agentmessage", + enabled: true, + createdAtMs: STORE_TEST_NOW - 60_000, + schedule: { kind: "cron", expr: "0 9 * * *", tz: "UTC" }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentmessage", message: "summarize" }, + metadata: { preserve: { nested: true } }, + }, + ]); + + const state = createStoreTestState(storePath); + await ensureLoaded(state, { skipRecompute: true }); + + expect(state.store?.jobs.map((job) => job.id)).toEqual(["valid-job"]); + expect(() => findJobOrThrow(state, "legacy-command")).toThrow(/unknown cron job id/); + expect(() => findJobOrThrow(state, "legacy-agentmessage")).toThrow(/unknown cron job id/); + + const valid = findJobOrThrow(state, "valid-job"); + valid.name = "valid job renamed"; + await persist(state); + + const config = JSON.parse(await fs.readFile(storePath, "utf8")) as { + jobs: Array>; + }; + expect(config.jobs.map((job) => job.id)).toEqual([ + "valid-job", + "legacy-command", + "legacy-agentmessage", + ]); + expect(config.jobs[0]?.name).toBe("valid job renamed"); + expect(config.jobs[1]).toMatchObject({ + id: "legacy-command", + payload: { kind: "command", command: "echo daily" }, + state: { lastRunAtMs: STORE_TEST_NOW - 3_600_000 }, + }); + expect(config.jobs[2]).toMatchObject({ + id: "legacy-agentmessage", + payload: { kind: "agentmessage", message: "summarize" }, + metadata: { preserve: { nested: true } }, + }); + expect(config.jobs[2]).not.toHaveProperty("state"); + expect(config.jobs[2]).not.toHaveProperty("updatedAtMs"); + + const stateFile = JSON.parse( + await fs.readFile(storePath.replace(/\.json$/, "-state.json"), "utf8"), + ) as { jobs: Record }; + expect(Object.keys(stateFile.jobs)).toEqual(["valid-job"]); + + const invalidPayloadWarns = logger.warn.mock.calls.filter((call) => { + const msg = typeof call[1] === "string" ? call[1] : ""; + return msg.includes("skipped invalid persisted job"); + }); + expect(invalidPayloadWarns.map((call) => (call[0] as { jobId?: string }).jobId)).toEqual([ + "legacy-command", + "legacy-agentmessage", + ]); + }); + it("normalizes jobId-only jobs in memory so scheduler lookups resolve by stable id", async () => { const { storePath } = await makeStorePath(); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 37fd1b7197a..884407730a1 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -4,7 +4,11 @@ import { normalizeCronJobInput } from "../normalize.js"; import { getInvalidPersistedCronJobReason } from "../persisted-shape.js"; import { cronSchedulingInputsEqual } from "../schedule-identity.js"; import { isInvalidCronSessionTargetIdError } from "../session-target.js"; -import { loadCronStore, saveCronStore } from "../store.js"; +import { + loadCronStoreWithConfigJobs, + saveCronStore, + type PreservedCronConfigJob, +} from "../store.js"; import type { CronJob } from "../types.js"; import { recomputeNextRuns } from "./jobs.js"; import type { CronServiceState } from "./state.js"; @@ -44,6 +48,21 @@ function warnInvalidPersistedCronJob(params: { ); } +function isRecord(value: unknown): value is Record { + return !!value && typeof value === "object" && !Array.isArray(value); +} + +function hasUnsupportedStringPayloadKind(candidate: Record): boolean { + const payload = candidate.payload; + if (!isRecord(payload)) { + return false; + } + const kind = payload.kind; + return ( + typeof kind === "string" && kind.trim() !== "" && kind !== "systemEvent" && kind !== "agentTurn" + ); +} + async function getFileMtimeMs(path: string): Promise { try { const stats = await fs.promises.stat(path); @@ -75,11 +94,13 @@ export async function ensureLoaded( // edits on filesystems with coarse mtime resolution. const fileMtimeMs = await getFileMtimeMs(state.deps.storePath); - const loaded = await loadCronStore(state.deps.storePath); - const loadedJobs = (loaded.jobs ?? []) as unknown as CronJob[]; + const loaded = await loadCronStoreWithConfigJobs(state.deps.storePath); + const loadedJobs = (loaded.store.jobs ?? []) as unknown as CronJob[]; const jobs: CronJob[] = []; + const preservedInvalidPersistedJobs: PreservedCronConfigJob[] = []; for (const [index, job] of loadedJobs.entries()) { const raw = job as unknown as Record; + const rawConfigJob = loaded.configJobs[index] ?? structuredClone(raw); const { legacyJobIdIssue } = normalizeCronJobIdentityFields(raw); let normalized: Record | null; try { @@ -100,6 +121,9 @@ export async function ensureLoaded( hydrated as unknown as Record, ); if (invalidReason) { + if (invalidReason === "invalid-payload" && hasUnsupportedStringPayloadKind(rawConfigJob)) { + preservedInvalidPersistedJobs.push({ index, job: rawConfigJob }); + } warnInvalidPersistedCronJob({ state, raw, index, reason: invalidReason }); continue; } @@ -159,6 +183,7 @@ export async function ensureLoaded( version: 1, jobs, }; + state.preservedInvalidPersistedJobs = preservedInvalidPersistedJobs; state.storeLoadedAtMs = state.deps.nowMs(); state.storeFileMtimeMs = fileMtimeMs; @@ -188,7 +213,10 @@ export async function persist( if (!state.store) { return; } - await saveCronStore(state.deps.storePath, state.store, opts); + await saveCronStore(state.deps.storePath, state.store, { + ...opts, + preservedConfigJobs: state.preservedInvalidPersistedJobs, + }); // Update file mtime after save to prevent immediate reload state.storeFileMtimeMs = await getFileMtimeMs(state.deps.storePath); } diff --git a/src/cron/store.ts b/src/cron/store.ts index 1b74b6ce016..eb5e7eed299 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -13,6 +13,16 @@ type SerializedStoreCacheEntry = { needsSplitMigration: boolean; }; +export type PreservedCronConfigJob = { + index: number; + job: Record; +}; + +export type LoadedCronStore = { + store: CronStoreFile; + configJobs: Array>; +}; + const serializedStoreCache = new Map(); function getSerializedStoreCache(storePath: string): SerializedStoreCacheEntry { @@ -66,13 +76,71 @@ function normalizeCronStoreFile(parsed: unknown): CronStoreFile { }; } -function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown { +function cloneConfigJobs(jobs: Array>): Array> { + return jobs.map((job) => structuredClone(job)); +} + +function stripJobRuntimeFields(job: CronStoreFile["jobs"][number]): Record { + const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job; + return { ...rest, state: {} }; +} + +function persistedJobId(job: Record): string | null { + const id = job.id; + return typeof id === "string" && id.trim() ? id : null; +} + +function mergePreservedConfigJobs( + jobs: Array>, + preservedConfigJobs: PreservedCronConfigJob[] | undefined, +): Array> { + if (!preservedConfigJobs?.length) { + return jobs; + } + + const occupiedIds = new Set(); + for (const job of jobs) { + const id = persistedJobId(job); + if (id) { + occupiedIds.add(id); + } + } + + const seenPreservedIds = new Set(); + const preserved = preservedConfigJobs + .filter((entry) => { + const id = persistedJobId(entry.job); + if (!id) { + return true; + } + if (occupiedIds.has(id) || seenPreservedIds.has(id)) { + return false; + } + seenPreservedIds.add(id); + return true; + }) + .toSorted((a, b) => a.index - b.index); + + if (preserved.length === 0) { + return jobs; + } + + const merged = jobs.slice(); + for (const entry of preserved) { + const index = Math.max(0, Math.min(entry.index, merged.length)); + merged.splice(index, 0, { ...entry.job }); + } + return merged; +} + +function stripRuntimeOnlyCronFields( + store: CronStoreFile, + preservedConfigJobs?: PreservedCronConfigJob[], +): unknown { + const jobs = store.jobs.map(stripJobRuntimeFields); return { version: store.version, - jobs: store.jobs.map((job) => { - const { state: _state, updatedAtMs: _updatedAtMs, ...rest } = job; - return { ...rest, state: {} }; - }), + jobs: mergePreservedConfigJobs(jobs, preservedConfigJobs), }; } @@ -213,7 +281,7 @@ function mergeStateFileEntry(job: CronStoreFile["jobs"][number], entry: unknown) } } -export async function loadCronStore(storePath: string): Promise { +export async function loadCronStoreWithConfigJobs(storePath: string): Promise { try { const raw = await fs.promises.readFile(storePath, "utf-8"); let parsed: unknown; @@ -226,6 +294,7 @@ export async function loadCronStore(storePath: string): Promise { } const store = normalizeCronStoreFile(parsed); const jobs = store.jobs as unknown as Array>; + const configJobs = cloneConfigJobs(jobs); // Load state file and merge. const statePath = resolveStatePath(storePath); @@ -263,16 +332,20 @@ export async function loadCronStore(storePath: string): Promise { needsSplitMigration: hasLegacyInlineState, }); - return store; + return { store, configJobs }; } catch (err) { if ((err as { code?: unknown })?.code === "ENOENT") { serializedStoreCache.delete(storePath); - return { version: 1, jobs: [] }; + return { store: { version: 1, jobs: [] }, configJobs: [] }; } throw err; } } +export async function loadCronStore(storePath: string): Promise { + return (await loadCronStoreWithConfigJobs(storePath)).store; +} + export function loadCronStoreSync(storePath: string): CronStoreFile { try { const raw = fs.readFileSync(storePath, "utf-8"); @@ -321,6 +394,7 @@ export function loadCronStoreSync(storePath: string): CronStoreFile { type SaveCronStoreOptions = { skipBackup?: boolean; stateOnly?: boolean; + preservedConfigJobs?: PreservedCronConfigJob[]; }; async function setSecureFileMode(filePath: string): Promise { @@ -364,7 +438,11 @@ export async function saveCronStore( opts?: SaveCronStoreOptions, ) { const stateOnly = opts?.stateOnly === true; - const configJson = JSON.stringify(stripRuntimeOnlyCronFields(store), null, 2); + const configJson = JSON.stringify( + stripRuntimeOnlyCronFields(store, opts?.preservedConfigJobs), + null, + 2, + ); const stateFile = extractStateFile(store); const stateJson = JSON.stringify(stateFile, null, 2);