mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
refactor: split cron startup catch-up flow
This commit is contained in:
committed by
Vincent Koc
parent
8cb688c44d
commit
1b8f800487
@@ -53,6 +53,16 @@ type TimedCronRunOutcome = CronRunOutcome &
|
||||
endedAt: number;
|
||||
};
|
||||
|
||||
type StartupCatchupCandidate = {
|
||||
jobId: string;
|
||||
job: CronJob;
|
||||
};
|
||||
|
||||
type StartupCatchupPlan = {
|
||||
candidates: StartupCatchupCandidate[];
|
||||
deferredJobIds: string[];
|
||||
};
|
||||
|
||||
export async function executeJobCoreWithTimeout(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
@@ -832,31 +842,37 @@ export async function runMissedJobs(
|
||||
state: CronServiceState,
|
||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||
) {
|
||||
const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS);
|
||||
const plan = await planStartupCatchup(state, opts);
|
||||
if (plan.candidates.length === 0 && plan.deferredJobIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const outcomes = await executeStartupCatchupPlan(state, plan);
|
||||
await applyStartupCatchupOutcomes(state, plan, outcomes);
|
||||
}
|
||||
|
||||
async function planStartupCatchup(
|
||||
state: CronServiceState,
|
||||
opts?: { skipJobIds?: ReadonlySet<string> },
|
||||
): Promise<StartupCatchupPlan> {
|
||||
const maxImmediate = Math.max(
|
||||
0,
|
||||
state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART,
|
||||
);
|
||||
const selection = await locked(state, async () => {
|
||||
return locked(state, async () => {
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
if (!state.store) {
|
||||
return {
|
||||
deferredJobIds: [] as string[],
|
||||
startupCandidates: [] as Array<{ jobId: string; job: CronJob }>,
|
||||
};
|
||||
return { candidates: [], deferredJobIds: [] };
|
||||
}
|
||||
|
||||
const now = state.deps.nowMs();
|
||||
const skipJobIds = opts?.skipJobIds;
|
||||
const missed = collectRunnableJobs(state, now, {
|
||||
skipJobIds,
|
||||
skipJobIds: opts?.skipJobIds,
|
||||
skipAtIfAlreadyRan: true,
|
||||
allowCronMissedRunByLastRun: true,
|
||||
});
|
||||
if (missed.length === 0) {
|
||||
return {
|
||||
deferredJobIds: [] as string[],
|
||||
startupCandidates: [] as Array<{ jobId: string; job: CronJob }>,
|
||||
};
|
||||
return { candidates: [], deferredJobIds: [] };
|
||||
}
|
||||
const sorted = missed.toSorted(
|
||||
(a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0),
|
||||
@@ -884,47 +900,64 @@ export async function runMissedJobs(
|
||||
job.state.lastError = undefined;
|
||||
}
|
||||
await persist(state);
|
||||
|
||||
return {
|
||||
candidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
|
||||
deferredJobIds: deferred.map((job) => job.id),
|
||||
startupCandidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
if (selection.startupCandidates.length === 0 && selection.deferredJobIds.length === 0) {
|
||||
return;
|
||||
async function executeStartupCatchupPlan(
|
||||
state: CronServiceState,
|
||||
plan: StartupCatchupPlan,
|
||||
): Promise<TimedCronRunOutcome[]> {
|
||||
const outcomes: TimedCronRunOutcome[] = [];
|
||||
for (const candidate of plan.candidates) {
|
||||
outcomes.push(await runStartupCatchupCandidate(state, candidate));
|
||||
}
|
||||
return outcomes;
|
||||
}
|
||||
|
||||
const outcomes: Array<TimedCronRunOutcome> = [];
|
||||
for (const candidate of selection.startupCandidates) {
|
||||
const startedAt = state.deps.nowMs();
|
||||
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
|
||||
try {
|
||||
const result = await executeJobCoreWithTimeout(state, candidate.job);
|
||||
outcomes.push({
|
||||
jobId: candidate.jobId,
|
||||
status: result.status,
|
||||
error: result.error,
|
||||
summary: result.summary,
|
||||
delivered: result.delivered,
|
||||
sessionId: result.sessionId,
|
||||
sessionKey: result.sessionKey,
|
||||
model: result.model,
|
||||
provider: result.provider,
|
||||
usage: result.usage,
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
});
|
||||
} catch (err) {
|
||||
outcomes.push({
|
||||
jobId: candidate.jobId,
|
||||
status: "error",
|
||||
error: String(err),
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
});
|
||||
}
|
||||
async function runStartupCatchupCandidate(
|
||||
state: CronServiceState,
|
||||
candidate: StartupCatchupCandidate,
|
||||
): Promise<TimedCronRunOutcome> {
|
||||
const startedAt = state.deps.nowMs();
|
||||
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
|
||||
try {
|
||||
const result = await executeJobCoreWithTimeout(state, candidate.job);
|
||||
return {
|
||||
jobId: candidate.jobId,
|
||||
status: result.status,
|
||||
error: result.error,
|
||||
summary: result.summary,
|
||||
delivered: result.delivered,
|
||||
sessionId: result.sessionId,
|
||||
sessionKey: result.sessionKey,
|
||||
model: result.model,
|
||||
provider: result.provider,
|
||||
usage: result.usage,
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
};
|
||||
} catch (err) {
|
||||
return {
|
||||
jobId: candidate.jobId,
|
||||
status: "error",
|
||||
error: String(err),
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async function applyStartupCatchupOutcomes(
|
||||
state: CronServiceState,
|
||||
plan: StartupCatchupPlan,
|
||||
outcomes: TimedCronRunOutcome[],
|
||||
): Promise<void> {
|
||||
const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS);
|
||||
await locked(state, async () => {
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
if (!state.store) {
|
||||
@@ -935,10 +968,10 @@ export async function runMissedJobs(
|
||||
applyOutcomeToStoredJob(state, result);
|
||||
}
|
||||
|
||||
if (selection.deferredJobIds.length > 0) {
|
||||
if (plan.deferredJobIds.length > 0) {
|
||||
const baseNow = state.deps.nowMs();
|
||||
let offset = staggerMs;
|
||||
for (const jobId of selection.deferredJobIds) {
|
||||
for (const jobId of plan.deferredJobIds) {
|
||||
const job = state.store.jobs.find((entry) => entry.id === jobId);
|
||||
if (!job || !job.enabled) {
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user