refactor: split cron startup catch-up flow

This commit is contained in:
Peter Steinberger
2026-03-09 06:14:34 +00:00
parent 1d301f74a6
commit e86b38f09d

View File

@@ -53,6 +53,16 @@ type TimedCronRunOutcome = CronRunOutcome &
endedAt: number;
};
type StartupCatchupCandidate = {
jobId: string;
job: CronJob;
};
type StartupCatchupPlan = {
candidates: StartupCatchupCandidate[];
deferredJobIds: string[];
};
export async function executeJobCoreWithTimeout(
state: CronServiceState,
job: CronJob,
@@ -832,31 +842,37 @@ export async function runMissedJobs(
state: CronServiceState,
opts?: { skipJobIds?: ReadonlySet<string> },
) {
const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS);
const plan = await planStartupCatchup(state, opts);
if (plan.candidates.length === 0 && plan.deferredJobIds.length === 0) {
return;
}
const outcomes = await executeStartupCatchupPlan(state, plan);
await applyStartupCatchupOutcomes(state, plan, outcomes);
}
async function planStartupCatchup(
state: CronServiceState,
opts?: { skipJobIds?: ReadonlySet<string> },
): Promise<StartupCatchupPlan> {
const maxImmediate = Math.max(
0,
state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART,
);
const selection = await locked(state, async () => {
return locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });
if (!state.store) {
return {
deferredJobIds: [] as string[],
startupCandidates: [] as Array<{ jobId: string; job: CronJob }>,
};
return { candidates: [], deferredJobIds: [] };
}
const now = state.deps.nowMs();
const skipJobIds = opts?.skipJobIds;
const missed = collectRunnableJobs(state, now, {
skipJobIds,
skipJobIds: opts?.skipJobIds,
skipAtIfAlreadyRan: true,
allowCronMissedRunByLastRun: true,
});
if (missed.length === 0) {
return {
deferredJobIds: [] as string[],
startupCandidates: [] as Array<{ jobId: string; job: CronJob }>,
};
return { candidates: [], deferredJobIds: [] };
}
const sorted = missed.toSorted(
(a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0),
@@ -884,47 +900,64 @@ export async function runMissedJobs(
job.state.lastError = undefined;
}
await persist(state);
return {
candidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
deferredJobIds: deferred.map((job) => job.id),
startupCandidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
};
});
}
if (selection.startupCandidates.length === 0 && selection.deferredJobIds.length === 0) {
return;
async function executeStartupCatchupPlan(
state: CronServiceState,
plan: StartupCatchupPlan,
): Promise<TimedCronRunOutcome[]> {
const outcomes: TimedCronRunOutcome[] = [];
for (const candidate of plan.candidates) {
outcomes.push(await runStartupCatchupCandidate(state, candidate));
}
return outcomes;
}
const outcomes: Array<TimedCronRunOutcome> = [];
for (const candidate of selection.startupCandidates) {
const startedAt = state.deps.nowMs();
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
try {
const result = await executeJobCoreWithTimeout(state, candidate.job);
outcomes.push({
jobId: candidate.jobId,
status: result.status,
error: result.error,
summary: result.summary,
delivered: result.delivered,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
model: result.model,
provider: result.provider,
usage: result.usage,
startedAt,
endedAt: state.deps.nowMs(),
});
} catch (err) {
outcomes.push({
jobId: candidate.jobId,
status: "error",
error: String(err),
startedAt,
endedAt: state.deps.nowMs(),
});
}
async function runStartupCatchupCandidate(
state: CronServiceState,
candidate: StartupCatchupCandidate,
): Promise<TimedCronRunOutcome> {
const startedAt = state.deps.nowMs();
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
try {
const result = await executeJobCoreWithTimeout(state, candidate.job);
return {
jobId: candidate.jobId,
status: result.status,
error: result.error,
summary: result.summary,
delivered: result.delivered,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
model: result.model,
provider: result.provider,
usage: result.usage,
startedAt,
endedAt: state.deps.nowMs(),
};
} catch (err) {
return {
jobId: candidate.jobId,
status: "error",
error: String(err),
startedAt,
endedAt: state.deps.nowMs(),
};
}
}
async function applyStartupCatchupOutcomes(
state: CronServiceState,
plan: StartupCatchupPlan,
outcomes: TimedCronRunOutcome[],
): Promise<void> {
const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS);
await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
if (!state.store) {
@@ -935,10 +968,10 @@ export async function runMissedJobs(
applyOutcomeToStoredJob(state, result);
}
if (selection.deferredJobIds.length > 0) {
if (plan.deferredJobIds.length > 0) {
const baseNow = state.deps.nowMs();
let offset = staggerMs;
for (const jobId of selection.deferredJobIds) {
for (const jobId of plan.deferredJobIds) {
const job = state.store.jobs.find((entry) => entry.id === jobId);
if (!job || !job.enabled) {
continue;