mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-16 20:40:45 +00:00
refactor: share cron manual run preflight
This commit is contained in:
@@ -360,13 +360,23 @@ type ManualRunDisposition =
|
||||
| Extract<PreparedManualRun, { ran: false }>
|
||||
| { ok: true; runnable: true };
|
||||
|
||||
type ManualRunPreflightResult =
|
||||
| { ok: false }
|
||||
| Extract<PreparedManualRun, { ran: false }>
|
||||
| {
|
||||
ok: true;
|
||||
runnable: true;
|
||||
job: CronJob;
|
||||
now: number;
|
||||
};
|
||||
|
||||
let nextManualRunId = 1;
|
||||
|
||||
async function inspectManualRunDisposition(
|
||||
async function inspectManualRunPreflight(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
): Promise<ManualRunDisposition | { ok: false }> {
|
||||
): Promise<ManualRunPreflightResult> {
|
||||
return await locked(state, async () => {
|
||||
warnIfDisabled(state, "run");
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
@@ -383,46 +393,50 @@ async function inspectManualRunDisposition(
|
||||
if (!due) {
|
||||
return { ok: true, ran: false, reason: "not-due" as const };
|
||||
}
|
||||
return { ok: true, runnable: true } as const;
|
||||
return { ok: true, runnable: true, job, now } as const;
|
||||
});
|
||||
}
|
||||
|
||||
async function inspectManualRunDisposition(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
): Promise<ManualRunDisposition | { ok: false }> {
|
||||
const result = await inspectManualRunPreflight(state, id, mode);
|
||||
if (!result.ok || !result.runnable) {
|
||||
return result;
|
||||
}
|
||||
return { ok: true, runnable: true } as const;
|
||||
}
|
||||
|
||||
async function prepareManualRun(
|
||||
state: CronServiceState,
|
||||
id: string,
|
||||
mode?: "due" | "force",
|
||||
): Promise<PreparedManualRun> {
|
||||
const preflight = await inspectManualRunPreflight(state, id, mode);
|
||||
if (!preflight.ok || !preflight.runnable) {
|
||||
return preflight;
|
||||
}
|
||||
return await locked(state, async () => {
|
||||
warnIfDisabled(state, "run");
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
// Normalize job tick state (clears stale runningAtMs markers) before
|
||||
// checking if already running, so a stale marker from a crashed Phase-1
|
||||
// persist does not block manual triggers for up to STUCK_RUN_MS (#17554).
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
// Reserve this run under lock, then execute outside lock so read ops
|
||||
// (`list`, `status`) stay responsive while the run is in progress.
|
||||
const job = findJobOrThrow(state, id);
|
||||
if (typeof job.state.runningAtMs === "number") {
|
||||
return { ok: true, ran: false, reason: "already-running" as const };
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
const due = isJobDue(job, now, { forced: mode === "force" });
|
||||
if (!due) {
|
||||
return { ok: true, ran: false, reason: "not-due" as const };
|
||||
}
|
||||
|
||||
// Reserve this run under lock, then execute outside lock so read ops
|
||||
// (`list`, `status`) stay responsive while the run is in progress.
|
||||
job.state.runningAtMs = now;
|
||||
job.state.runningAtMs = preflight.now;
|
||||
job.state.lastError = undefined;
|
||||
// Persist the running marker before releasing lock so timer ticks that
|
||||
// force-reload from disk cannot start the same job concurrently.
|
||||
await persist(state);
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: now });
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now });
|
||||
const executionJob = JSON.parse(JSON.stringify(job)) as CronJob;
|
||||
return {
|
||||
ok: true,
|
||||
ran: true,
|
||||
jobId: job.id,
|
||||
startedAt: now,
|
||||
startedAt: preflight.now,
|
||||
executionJob,
|
||||
} as const;
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user