diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 08b4b6be206..f82290006b4 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -53,6 +53,16 @@ type TimedCronRunOutcome = CronRunOutcome & endedAt: number; }; +type StartupCatchupCandidate = { + jobId: string; + job: CronJob; +}; + +type StartupCatchupPlan = { + candidates: StartupCatchupCandidate[]; + deferredJobIds: string[]; +}; + export async function executeJobCoreWithTimeout( state: CronServiceState, job: CronJob, @@ -832,31 +842,37 @@ export async function runMissedJobs( state: CronServiceState, opts?: { skipJobIds?: ReadonlySet }, ) { - const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS); + const plan = await planStartupCatchup(state, opts); + if (plan.candidates.length === 0 && plan.deferredJobIds.length === 0) { + return; + } + + const outcomes = await executeStartupCatchupPlan(state, plan); + await applyStartupCatchupOutcomes(state, plan, outcomes); +} + +async function planStartupCatchup( + state: CronServiceState, + opts?: { skipJobIds?: ReadonlySet }, +): Promise { const maxImmediate = Math.max( 0, state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART, ); - const selection = await locked(state, async () => { + return locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); if (!state.store) { - return { - deferredJobIds: [] as string[], - startupCandidates: [] as Array<{ jobId: string; job: CronJob }>, - }; + return { candidates: [], deferredJobIds: [] }; } + const now = state.deps.nowMs(); - const skipJobIds = opts?.skipJobIds; const missed = collectRunnableJobs(state, now, { - skipJobIds, + skipJobIds: opts?.skipJobIds, skipAtIfAlreadyRan: true, allowCronMissedRunByLastRun: true, }); if (missed.length === 0) { - return { - deferredJobIds: [] as string[], - startupCandidates: [] as Array<{ jobId: string; job: CronJob }>, - }; + return { candidates: [], deferredJobIds: [] }; } const sorted = missed.toSorted( (a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0), @@ -884,47 +900,64 @@ export async function runMissedJobs( job.state.lastError = undefined; } await persist(state); + return { + candidates: startupCandidates.map((job) => ({ jobId: job.id, job })), deferredJobIds: deferred.map((job) => job.id), - startupCandidates: startupCandidates.map((job) => ({ jobId: job.id, job })), }; }); +} - if (selection.startupCandidates.length === 0 && selection.deferredJobIds.length === 0) { - return; +async function executeStartupCatchupPlan( + state: CronServiceState, + plan: StartupCatchupPlan, +): Promise { + const outcomes: TimedCronRunOutcome[] = []; + for (const candidate of plan.candidates) { + outcomes.push(await runStartupCatchupCandidate(state, candidate)); } + return outcomes; +} - const outcomes: Array = []; - for (const candidate of selection.startupCandidates) { - const startedAt = state.deps.nowMs(); - emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt }); - try { - const result = await executeJobCoreWithTimeout(state, candidate.job); - outcomes.push({ - jobId: candidate.jobId, - status: result.status, - error: result.error, - summary: result.summary, - delivered: result.delivered, - sessionId: result.sessionId, - sessionKey: result.sessionKey, - model: result.model, - provider: result.provider, - usage: result.usage, - startedAt, - endedAt: state.deps.nowMs(), - }); - } catch (err) { - outcomes.push({ - jobId: candidate.jobId, - status: "error", - error: String(err), - startedAt, - endedAt: state.deps.nowMs(), - }); - } +async function runStartupCatchupCandidate( + state: CronServiceState, + candidate: StartupCatchupCandidate, +): Promise { + const startedAt = state.deps.nowMs(); + emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt }); + try { + const result = await executeJobCoreWithTimeout(state, candidate.job); + return { + jobId: candidate.jobId, + status: result.status, + error: result.error, + summary: result.summary, + delivered: result.delivered, + sessionId: result.sessionId, + sessionKey: result.sessionKey, + model: result.model, + provider: result.provider, + usage: result.usage, + startedAt, + endedAt: state.deps.nowMs(), + }; + } catch (err) { + return { + jobId: candidate.jobId, + status: "error", + error: String(err), + startedAt, + endedAt: state.deps.nowMs(), + }; } +} +async function applyStartupCatchupOutcomes( + state: CronServiceState, + plan: StartupCatchupPlan, + outcomes: TimedCronRunOutcome[], +): Promise { + const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS); await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); if (!state.store) { @@ -935,10 +968,10 @@ export async function runMissedJobs( applyOutcomeToStoredJob(state, result); } - if (selection.deferredJobIds.length > 0) { + if (plan.deferredJobIds.length > 0) { const baseNow = state.deps.nowMs(); let offset = staggerMs; - for (const jobId of selection.deferredJobIds) { + for (const jobId of plan.deferredJobIds) { const job = state.store.jobs.find((entry) => entry.id === jobId); if (!job || !job.enabled) { continue;