Files
openclaw/src/cron/service/timer.ts
Peter Steinberger c6ee68b751 Reapply "refactor: move runtime state to SQLite"
This reverts commit 694ca50e97.
2026-05-28 00:46:31 +01:00

1955 lines
61 KiB
TypeScript

import { formatEmbeddedAgentExecutionPhase } from "../../agents/embedded-agent-runner/execution-phase.js";
import { resolveFailoverReasonFromError } from "../../agents/failover-error.js";
import { formatEmbeddedAgentExecutionPhase } from "../../agents/pi-embedded-runner/execution-phase.js";
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
isRetryableHeartbeatBusySkipReason,
} from "../../infra/heartbeat-wake.js";
import { isSubagentSessionKey } from "../../routing/session-key.js";
import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js";
import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/detached-task-runtime.js";
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "../delivery-plan.js";
import { resolveCronAgentSessionKey } from "../isolated-agent/session-key.js";
import { resolveCronExecutionRetryHint } from "../retry-hint.js";
import {
createCronRunDiagnosticsFromError,
normalizeCronRunDiagnostics,
summarizeCronRunDiagnostics,
} from "../run-diagnostics.js";
import { createCronExecutionId } from "../run-id.js";
import type {
CronAgentExecutionPhase,
CronAgentExecutionPhaseUpdate,
CronAgentExecutionStarted,
CronDeliveryStatus,
CronDeliveryTrace,
CronFailureNotificationDelivery,
CronJob,
CronMessageChannel,
CronRunOutcome,
CronRunStatus,
CronRunTelemetry,
} from "../types.js";
import {
DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
computeJobPreviousRunAtMs,
computeJobNextRunAtMs,
errorBackoffMs,
hasScheduledNextRunAtMs,
isJobEnabled,
nextWakeAtMs,
recomputeNextRunsForMaintenance,
recordScheduleComputeError,
resolveJobPayloadTextForMain,
} from "./jobs.js";
import { locked } from "./locked.js";
import type { CronEvent, CronServiceState } from "./state.js";
import { ensureLoaded, persist } from "./store.js";
import { CRON_TASK_RUNNING_PROGRESS_SUMMARY } from "./task-ledger.js";
import { resolveCronJobTimeoutMs } from "./timeout-policy.js";
export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js";
const MAX_TIMER_DELAY_MS = 60_000;
const CRON_TIMEOUT_CLEANUP_GUARD_MS = 20_000;
const CRON_AGENT_SETUP_WATCHDOG_MS = 60_000;
const CRON_AGENT_PRE_EXECUTION_WATCHDOG_MS = 60_000;
const CRON_AGENT_PRE_EXECUTION_MIN_WATCHDOG_MS = 1_000;
/**
* Minimum gap between consecutive fires of the same cron job. This is a
* safety net that prevents spin-loops when `computeJobNextRunAtMs` returns
* a value within the same second as the just-completed run. The guard
* is intentionally generous (2 s) so it never masks a legitimate schedule
* but always breaks an infinite re-trigger cycle. (See #17821)
*/
const MIN_REFIRE_GAP_MS = 2_000;
const DEFAULT_MISSED_JOB_STAGGER_MS = 5_000;
const DEFAULT_MAX_MISSED_JOBS_PER_RESTART = 5;
const DEFAULT_STARTUP_DEFERRED_MISSED_AGENT_JOB_DELAY_MS = 2 * 60_000;
const DEFAULT_FAILURE_ALERT_AFTER = 2;
const DEFAULT_FAILURE_ALERT_COOLDOWN_MS = 60 * 60_000; // 1 hour
type ResolvedFailureAlert = {
after: number;
cooldownMs: number;
channel: CronMessageChannel;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
includeSkipped: boolean;
};
type TimedCronRunOutcome = CronRunOutcome &
CronRunTelemetry & {
jobId: string;
job: CronJob;
taskRunId?: string;
delivered?: boolean;
deliveryAttempted?: boolean;
startedAt: number;
endedAt: number;
};
type StartupCatchupCandidate = {
jobId: string;
job: CronJob;
};
type StartupDeferredJob = {
jobId: string;
delayMs?: number;
};
type StartupCatchupPlan = {
candidates: StartupCatchupCandidate[];
deferredJobs: StartupDeferredJob[];
};
type CronAgentWatchdogState =
| "waiting_for_runner"
| "waiting_for_execution"
| "executing"
| "timed_out"
| "disposed";
type CronAgentPhaseWatchdogStage = "pre_execution" | "execution";
const CRON_AGENT_PHASE_WATCHDOG_STAGE = {
runner_entered: "pre_execution",
workspace: "pre_execution",
runtime_plugins: "pre_execution",
before_agent_reply: "execution",
model_resolution: "pre_execution",
auth: "pre_execution",
context_engine: "pre_execution",
attempt_dispatch: "execution",
context_assembled: "execution",
turn_accepted: "execution",
process_spawned: "execution",
tool_execution_started: "execution",
assistant_output_started: "execution",
model_call_started: "execution",
} as const satisfies Record<CronAgentExecutionPhase, CronAgentPhaseWatchdogStage>;
type CronAgentWatchdog = {
start: () => void;
noteRunnerStarted: (info?: CronAgentExecutionStarted) => void;
notePhase: (info: CronAgentExecutionPhaseUpdate) => void;
activeExecution: () => CronAgentExecutionStarted | undefined;
dispose: () => void;
};
export async function executeJobCoreWithTimeout(
state: CronServiceState,
job: CronJob,
): Promise<Awaited<ReturnType<typeof executeJobCore>>> {
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
if (typeof jobTimeoutMs !== "number") {
return await executeJobCore(state, job);
}
const runAbortController = new AbortController();
let timeoutReason: string | undefined;
const timeoutMarker = Symbol("cron-timeout");
let resolveTimeout: ((value: typeof timeoutMarker) => void) | undefined;
const timeoutPromise = new Promise<typeof timeoutMarker>((resolve) => {
resolveTimeout = resolve;
});
const deferTimeoutUntilExecutionStart =
job.sessionTarget !== "main" && job.payload.kind === "agentTurn";
const triggerTimeout = (reason: string) => {
if (runAbortController.signal.aborted) {
return;
}
timeoutReason = reason;
runAbortController.abort(reason);
resolveTimeout?.(timeoutMarker);
};
const watchdog = createCronAgentWatchdog({
deferUntilRunner: deferTimeoutUntilExecutionStart,
jobTimeoutMs,
triggerTimeout,
});
const corePromise = executeJobCore(state, job, runAbortController.signal, {
onExecutionStarted: deferTimeoutUntilExecutionStart ? watchdog.noteRunnerStarted : undefined,
onExecutionPhase: deferTimeoutUntilExecutionStart ? watchdog.notePhase : undefined,
});
watchdog.start();
void corePromise.catch((err) => {
if (runAbortController.signal.aborted) {
state.deps.log.warn(
{ jobId: job.id, err: String(err) },
"cron: job core rejected after timeout abort",
);
}
});
try {
const first = await Promise.race([corePromise, timeoutPromise]);
if (first !== timeoutMarker) {
return first;
}
const activeExecution = watchdog.activeExecution();
await cleanupTimedOutCronAgentRun(state, job, jobTimeoutMs, activeExecution);
const error = timeoutReason ?? timeoutErrorMessage(activeExecution);
return {
status: "error",
error,
diagnostics: createCronRunDiagnosticsFromError("cron-setup", error, {
nowMs: state.deps.nowMs,
}),
};
} finally {
watchdog.dispose();
}
}
function createCronAgentWatchdog(params: {
deferUntilRunner: boolean;
jobTimeoutMs: number;
triggerTimeout: (reason: string) => void;
}): CronAgentWatchdog {
let state: CronAgentWatchdogState = params.deferUntilRunner ? "waiting_for_runner" : "executing";
let timeoutId: NodeJS.Timeout | undefined;
let setupTimeoutId: NodeJS.Timeout | undefined;
let preExecutionTimeoutId: NodeJS.Timeout | undefined;
let activeExecution: CronAgentExecutionStarted | undefined;
const setTimedOut = (reason: string) => {
if (state === "timed_out" || state === "disposed") {
return;
}
state = "timed_out";
params.triggerTimeout(reason);
};
const startTimeout = () => {
if (timeoutId || state === "disposed") {
return;
}
timeoutId = setTimeout(() => {
setTimedOut(timeoutErrorMessage(activeExecution));
}, params.jobTimeoutMs);
};
const clearSetupTimeout = () => {
if (!setupTimeoutId) {
return;
}
clearTimeout(setupTimeoutId);
setupTimeoutId = undefined;
};
const clearPreExecutionTimeout = () => {
if (!preExecutionTimeoutId) {
return;
}
clearTimeout(preExecutionTimeoutId);
preExecutionTimeoutId = undefined;
};
const startPreExecutionTimeout = () => {
if (preExecutionTimeoutId || state !== "waiting_for_execution") {
return;
}
preExecutionTimeoutId = setTimeout(() => {
if (state === "waiting_for_execution") {
setTimedOut(preExecutionTimeoutErrorMessage(activeExecution));
}
}, resolveCronAgentPreExecutionWatchdogMs(params.jobTimeoutMs));
};
const noteExecutionProgress = (info?: CronAgentExecutionStarted) => {
if (!info) {
return;
}
const previousPhase = activeExecution?.phase;
activeExecution = { ...activeExecution, ...info };
const stage = info.phase ? CRON_AGENT_PHASE_WATCHDOG_STAGE[info.phase] : undefined;
if (
state === "executing" &&
previousPhase === "before_agent_reply" &&
stage === "pre_execution"
) {
state = "waiting_for_execution";
startPreExecutionTimeout();
return;
}
if (stage === "execution" || info.firstModelCallStarted) {
state = "executing";
clearPreExecutionTimeout();
}
};
return {
start: () => {
if (params.deferUntilRunner) {
setupTimeoutId = setTimeout(() => {
if (state === "waiting_for_runner") {
setTimedOut(setupTimeoutErrorMessage(activeExecution));
}
}, CRON_AGENT_SETUP_WATCHDOG_MS);
return;
}
startTimeout();
},
noteRunnerStarted: (info?: CronAgentExecutionStarted) => {
if (state === "disposed" || state === "timed_out") {
return;
}
clearSetupTimeout();
startTimeout();
if (state !== "executing") {
state = "waiting_for_execution";
}
noteExecutionProgress(info);
startPreExecutionTimeout();
},
notePhase: (info: CronAgentExecutionPhaseUpdate) => {
if (state === "disposed" || state === "timed_out") {
return;
}
noteExecutionProgress(info);
},
activeExecution: () => activeExecution,
dispose: () => {
state = "disposed";
if (timeoutId) {
clearTimeout(timeoutId);
}
clearSetupTimeout();
clearPreExecutionTimeout();
},
};
}
async function cleanupTimedOutCronAgentRun(
state: CronServiceState,
job: CronJob,
timeoutMs: number,
execution?: CronAgentExecutionStarted,
): Promise<void> {
if (!state.deps.cleanupTimedOutAgentRun) {
return;
}
let settleTimer: NodeJS.Timeout | undefined;
const cleanupPromise = state.deps.cleanupTimedOutAgentRun({ job, timeoutMs, execution });
const settleTimeout = new Promise<void>((resolve) => {
settleTimer = setTimeout(resolve, CRON_TIMEOUT_CLEANUP_GUARD_MS);
});
try {
await Promise.race([cleanupPromise, settleTimeout]);
} catch (err) {
state.deps.log.warn(
{ jobId: job.id, err: String(err) },
"cron: timed-out agent cleanup failed",
);
} finally {
if (settleTimer) {
clearTimeout(settleTimer);
}
}
}
function resolveRunConcurrency(state: CronServiceState): number {
const raw = state.deps.cronConfig?.maxConcurrentRuns;
if (typeof raw !== "number" || !Number.isFinite(raw)) {
return 1;
}
return Math.max(1, Math.floor(raw));
}
function timeoutErrorMessage(execution?: CronAgentExecutionStarted): string {
const phase = formatCronAgentExecutionPhase(execution);
if (!phase) {
return "cron: job execution timed out";
}
return `cron: job execution timed out (last phase: ${phase})`;
}
function setupTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string {
const phase = formatCronAgentExecutionPhase(execution);
if (!phase) {
return "cron: isolated agent setup timed out before runner start";
}
return `cron: isolated agent setup timed out before runner start (last phase: ${phase})`;
}
function preExecutionTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string {
const phase = formatCronAgentExecutionPhase(execution);
if (!phase) {
return "cron: isolated agent run stalled before execution start";
}
return `cron: isolated agent run stalled before execution start (last phase: ${phase})`;
}
function formatCronAgentExecutionPhase(execution?: CronAgentExecutionStarted): string | undefined {
return formatEmbeddedAgentExecutionPhase(execution?.phase);
}
function resolveCronAgentPreExecutionWatchdogMs(jobTimeoutMs: number): number {
return Math.max(
CRON_AGENT_PRE_EXECUTION_MIN_WATCHDOG_MS,
Math.min(CRON_AGENT_PRE_EXECUTION_WATCHDOG_MS, Math.floor(jobTimeoutMs / 2)),
);
}
function abortErrorMessage(signal?: AbortSignal): string {
const reason = signal?.reason;
if (typeof reason === "string" && reason.trim()) {
return reason.trim();
}
return timeoutErrorMessage();
}
function isAbortError(err: unknown): boolean {
if (!(err instanceof Error)) {
return false;
}
return err.name === "AbortError" || err.message === timeoutErrorMessage();
}
export function normalizeCronRunErrorText(err: unknown): string {
if (isAbortError(err)) {
return timeoutErrorMessage();
}
if (typeof err === "string") {
return err === `Error: ${timeoutErrorMessage()}` ? timeoutErrorMessage() : err;
}
return String(err);
}
function resolveCronTaskChildSessionKey(params: {
state: CronServiceState;
job: CronJob;
}): string | undefined {
const explicitSessionKey = params.job.sessionKey?.trim();
if (explicitSessionKey) {
return explicitSessionKey;
}
if (params.job.sessionTarget !== "isolated") {
return undefined;
}
return resolveCronAgentSessionKey({
sessionKey: `cron:${params.job.id}`,
agentId: params.job.agentId ?? params.state.deps.defaultAgentId ?? DEFAULT_AGENT_ID,
});
}
function tryCreateCronTaskRun(params: {
state: CronServiceState;
job: CronJob;
startedAt: number;
}): string | undefined {
const runId = createCronExecutionId(params.job.id, params.startedAt);
try {
createRunningTaskRun({
runtime: "cron",
sourceId: params.job.id,
ownerKey: "",
scopeKind: "system",
childSessionKey: resolveCronTaskChildSessionKey(params),
agentId: params.job.agentId,
runId,
label: params.job.name,
task: params.job.name || params.job.id,
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
startedAt: params.startedAt,
lastEventAt: params.startedAt,
progressSummary: CRON_TASK_RUNNING_PROGRESS_SUMMARY,
});
return runId;
} catch (error) {
params.state.deps.log.warn(
{ jobId: params.job.id, error },
"cron: failed to create task ledger record",
);
return undefined;
}
}
function tryFinishCronTaskRun(
state: CronServiceState,
result: Pick<TimedCronRunOutcome, "taskRunId" | "status" | "error" | "endedAt" | "summary">,
): void {
if (!result.taskRunId) {
return;
}
try {
if (result.status === "ok" || result.status === "skipped") {
completeTaskRunByRunId({
runId: result.taskRunId,
runtime: "cron",
endedAt: result.endedAt,
lastEventAt: result.endedAt,
terminalSummary: result.summary ?? undefined,
});
return;
}
failTaskRunByRunId({
runId: result.taskRunId,
runtime: "cron",
status:
normalizeCronRunErrorText(result.error) === timeoutErrorMessage() ? "timed_out" : "failed",
endedAt: result.endedAt,
lastEventAt: result.endedAt,
error: result.status === "error" ? normalizeCronRunErrorText(result.error) : undefined,
terminalSummary: result.summary ?? undefined,
});
} catch (error) {
state.deps.log.warn(
{ runId: result.taskRunId, jobStatus: result.status, error },
"cron: failed to update task ledger record",
);
}
}
/** Default max retries for one-shot jobs on transient errors (#24355). */
const DEFAULT_MAX_TRANSIENT_RETRIES = 3;
function resolveCronNextRunWithLowerBound(params: {
state: CronServiceState;
job: CronJob;
naturalNext: number | undefined;
lowerBoundMs: number;
context: "completion" | "error_backoff";
}): number | undefined {
if (params.naturalNext === undefined) {
params.state.deps.log.warn(
{
jobId: params.job.id,
jobName: params.job.name,
context: params.context,
},
"cron: next run unresolved; clearing schedule to avoid a refire loop",
);
return undefined;
}
return Math.max(params.naturalNext, params.lowerBoundMs);
}
function resolveRetryConfig(cronConfig?: CronConfig) {
const retry = cronConfig?.retry;
return {
maxAttempts:
typeof retry?.maxAttempts === "number" ? retry.maxAttempts : DEFAULT_MAX_TRANSIENT_RETRIES,
backoffMs:
Array.isArray(retry?.backoffMs) && retry.backoffMs.length > 0
? retry.backoffMs
: DEFAULT_ERROR_BACKOFF_SCHEDULE_MS.slice(0, 3),
retryOn: Array.isArray(retry?.retryOn) && retry.retryOn.length > 0 ? retry.retryOn : undefined,
};
}
function resolveDeliveryState(params: {
job: CronJob;
runStatus: CronRunStatus;
delivered?: boolean;
error?: string;
globalFailureDestination?: CronConfig["failureDestination"];
}): {
delivered?: boolean;
status: CronDeliveryStatus;
error?: string;
failureNotification: CronFailureNotificationDelivery;
} {
const primaryDeliveryRequested = resolveCronDeliveryPlan(params.job).requested;
const alternateFailureNotificationRequested =
params.runStatus === "error" &&
params.job.delivery?.bestEffort !== true &&
resolveFailureDestination(params.job, params.globalFailureDestination) !== null;
if (!primaryDeliveryRequested) {
return {
status: "not-requested",
failureNotification: {
status: alternateFailureNotificationRequested ? "unknown" : "not-requested",
},
};
}
if (params.runStatus === "error") {
const failureNotification: CronFailureNotificationDelivery =
alternateFailureNotificationRequested ? { status: "unknown" } : { status: "delivered" };
if (params.delivered === true) {
return {
delivered: false,
status: "not-delivered",
error: params.error,
failureNotification: alternateFailureNotificationRequested
? failureNotification
: { delivered: true, status: "delivered" },
};
}
if (params.delivered === false) {
return {
delivered: false,
status: "not-delivered",
error: params.error,
failureNotification: alternateFailureNotificationRequested
? failureNotification
: {
delivered: false,
status: "not-delivered",
...(params.error ? { error: params.error } : {}),
},
};
}
return {
status: "unknown",
error: params.error,
failureNotification: { status: "unknown" },
};
}
if (params.delivered === true) {
return {
delivered: true,
status: "delivered",
failureNotification: { status: "not-requested" },
};
}
if (params.delivered === false) {
return {
delivered: false,
status: "not-delivered",
error: params.error,
failureNotification: { status: "not-requested" },
};
}
return { status: "unknown", failureNotification: { status: "not-requested" } };
}
export function failureNotificationDeliveryFromJobState(
job: CronJob,
): CronFailureNotificationDelivery | undefined {
const status = job.state.lastFailureNotificationDeliveryStatus;
if (!status || status === "not-requested") {
return undefined;
}
return {
delivered: job.state.lastFailureNotificationDelivered,
status,
error: job.state.lastFailureNotificationDeliveryError,
};
}
function normalizeCronMessageChannel(input: unknown): CronMessageChannel | undefined {
const channel = normalizeOptionalLowercaseString(input);
return channel ? (channel as CronMessageChannel) : undefined;
}
function normalizeTo(input: unknown): string | undefined {
if (typeof input !== "string") {
return undefined;
}
const to = input.trim();
return to ? to : undefined;
}
function clampPositiveInt(value: unknown, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const floored = Math.floor(value);
return floored >= 1 ? floored : fallback;
}
function clampNonNegativeInt(value: unknown, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const floored = Math.floor(value);
return floored >= 0 ? floored : fallback;
}
function resolveFailureAlert(state: CronServiceState, job: CronJob): ResolvedFailureAlert | null {
const globalConfig = state.deps.cronConfig?.failureAlert;
const jobConfig = job.failureAlert === false ? undefined : job.failureAlert;
if (job.failureAlert === false) {
return null;
}
if (!jobConfig && globalConfig?.enabled !== true) {
return null;
}
const mode = jobConfig?.mode ?? globalConfig?.mode;
const explicitTo = normalizeTo(jobConfig?.to);
return {
after: clampPositiveInt(jobConfig?.after ?? globalConfig?.after, DEFAULT_FAILURE_ALERT_AFTER),
cooldownMs: clampNonNegativeInt(
jobConfig?.cooldownMs ?? globalConfig?.cooldownMs,
DEFAULT_FAILURE_ALERT_COOLDOWN_MS,
),
channel:
normalizeCronMessageChannel(jobConfig?.channel) ??
normalizeCronMessageChannel(job.delivery?.channel) ??
"last",
to: mode === "webhook" ? explicitTo : (explicitTo ?? normalizeTo(job.delivery?.to)),
mode,
accountId: jobConfig?.accountId ?? globalConfig?.accountId,
includeSkipped: jobConfig?.includeSkipped ?? globalConfig?.includeSkipped ?? false,
};
}
function emitFailureAlert(
state: CronServiceState,
params: {
job: CronJob;
error?: string;
consecutiveErrors: number;
channel: CronMessageChannel;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
status: "error" | "skipped";
provider?: string;
},
) {
const safeJobName = params.job.name || params.job.id;
const truncatedError = (params.error?.trim() || "unknown reason").slice(0, 200);
const errorReason =
params.status === "error" && typeof params.error === "string"
? (resolveFailoverReasonFromError(params.error, params.provider) ?? undefined)
: undefined;
const statusVerb = params.status === "skipped" ? "skipped" : "failed";
const detailLabel = params.status === "skipped" ? "Skip reason" : "Last error";
const text = [
`Cron job "${safeJobName}" ${statusVerb} ${params.consecutiveErrors} times`,
...(errorReason ? [`Cause: ${errorReason}`] : []),
`${detailLabel}: ${truncatedError}`,
].join("\n");
if (state.deps.sendCronFailureAlert) {
void state.deps
.sendCronFailureAlert({
job: params.job,
text,
channel: params.channel,
to: params.to,
mode: params.mode,
accountId: params.accountId,
})
.catch((err) => {
state.deps.log.warn(
{ jobId: params.job.id, err: String(err) },
"cron: failure alert delivery failed",
);
});
return;
}
state.deps.enqueueSystemEvent(text, { agentId: params.job.agentId });
if (params.job.wakeMode === "now") {
state.deps.requestHeartbeat({
source: "cron",
intent: "immediate",
reason: `cron:${params.job.id}:failure-alert`,
});
}
}
function maybeEmitFailureAlert(
state: CronServiceState,
params: {
job: CronJob;
alertConfig: ResolvedFailureAlert | null;
status: "error" | "skipped";
error?: string;
provider?: string;
consecutiveCount: number;
},
) {
if (!params.alertConfig || params.consecutiveCount < params.alertConfig.after) {
return;
}
const isBestEffort = params.job.delivery?.bestEffort === true;
if (isBestEffort) {
return;
}
const now = state.deps.nowMs();
const lastAlert = params.job.state.lastFailureAlertAtMs;
const inCooldown =
typeof lastAlert === "number" && now - lastAlert < Math.max(0, params.alertConfig.cooldownMs);
if (inCooldown) {
return;
}
emitFailureAlert(state, {
job: params.job,
error: params.error,
consecutiveErrors: params.consecutiveCount,
channel: params.alertConfig.channel,
to: params.alertConfig.to,
mode: params.alertConfig.mode,
accountId: params.alertConfig.accountId,
status: params.status,
provider: params.provider,
});
params.job.state.lastFailureAlertAtMs = now;
}
/**
* Apply the result of a job execution to the job's state.
* Handles consecutive error tracking, exponential backoff, one-shot disable,
* and nextRunAtMs computation. Returns `true` if the job should be deleted.
*/
export function applyJobResult(
state: CronServiceState,
job: CronJob,
result: {
status: CronRunStatus;
error?: string;
diagnostics?: CronRunOutcome["diagnostics"];
delivered?: boolean;
provider?: string;
startedAt: number;
endedAt: number;
},
opts?: {
// Preserve recurring "every" anchors for manual force runs.
preserveSchedule?: boolean;
},
): boolean {
const prevLastRunAtMs = job.state.lastRunAtMs;
const computeNextWithPreservedLastRun = (nowMs: number) => {
const saved = job.state.lastRunAtMs;
job.state.lastRunAtMs = prevLastRunAtMs;
try {
return computeJobNextRunAtMs(job, nowMs);
} finally {
job.state.lastRunAtMs = saved;
}
};
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = result.startedAt;
job.state.lastRunStatus = result.status;
job.state.lastStatus = result.status;
job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt);
job.state.lastError = result.error;
job.state.lastDiagnostics = normalizeCronRunDiagnostics(result.diagnostics);
job.state.lastDiagnosticSummary = summarizeCronRunDiagnostics(job.state.lastDiagnostics);
job.state.lastErrorReason =
result.status === "error" && typeof result.error === "string"
? (resolveFailoverReasonFromError(result.error, result.provider) ?? undefined)
: undefined;
if (result.status === "error") {
state.deps.log.warn(
{
jobId: job.id,
jobName: job.name,
error: result.error,
diagnosticsSummary: job.state.lastDiagnosticSummary,
},
"cron: job run returned error status",
);
}
const deliveryState = resolveDeliveryState({
job,
runStatus: result.status,
delivered: result.delivered,
error: result.error,
globalFailureDestination: state.deps.cronConfig?.failureDestination,
});
job.state.lastDelivered = deliveryState.delivered;
job.state.lastDeliveryStatus = deliveryState.status;
job.state.lastDeliveryError =
deliveryState.status === "not-delivered" && deliveryState.error
? deliveryState.error
: undefined;
job.state.lastFailureNotificationDelivered = deliveryState.failureNotification.delivered;
job.state.lastFailureNotificationDeliveryStatus = deliveryState.failureNotification.status;
job.state.lastFailureNotificationDeliveryError = deliveryState.failureNotification.error;
job.updatedAtMs = result.endedAt;
// Track consecutive errors for backoff / auto-disable; skipped runs use a
// separate counter so opt-in skip alerts do not affect retry behavior.
const alertConfig = resolveFailureAlert(state, job);
if (result.status === "error") {
job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
job.state.consecutiveSkipped = 0;
maybeEmitFailureAlert(state, {
job,
alertConfig,
status: "error",
error: result.error,
provider: result.provider,
consecutiveCount: job.state.consecutiveErrors,
});
} else if (result.status === "skipped") {
job.state.consecutiveErrors = 0;
job.state.consecutiveSkipped = (job.state.consecutiveSkipped ?? 0) + 1;
if (alertConfig?.includeSkipped) {
maybeEmitFailureAlert(state, {
job,
alertConfig,
status: "skipped",
error: result.error,
provider: result.provider,
consecutiveCount: job.state.consecutiveSkipped,
});
} else {
job.state.lastFailureAlertAtMs = undefined;
}
} else {
job.state.consecutiveErrors = 0;
job.state.consecutiveSkipped = 0;
job.state.lastFailureAlertAtMs = undefined;
}
const shouldDelete =
job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok";
if (!shouldDelete) {
if (job.schedule.kind === "at") {
if (result.status === "ok" || result.status === "skipped") {
// One-shot done or skipped: disable to prevent tight-loop (#11452).
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (result.status === "error") {
const retryConfig = resolveRetryConfig(state.deps.cronConfig);
const retryHint = resolveCronExecutionRetryHint(
result.error,
retryConfig.retryOn,
job.state.lastErrorReason,
);
// consecutiveErrors is always set to ≥1 by the increment block above.
const consecutive = job.state.consecutiveErrors;
if (retryHint.retryable && consecutive <= retryConfig.maxAttempts) {
// Schedule retry with backoff (#24355).
const backoff = errorBackoffMs(consecutive, retryConfig.backoffMs);
job.state.nextRunAtMs = result.endedAt + backoff;
state.deps.log.info(
{
jobId: job.id,
jobName: job.name,
consecutiveErrors: consecutive,
backoffMs: backoff,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: scheduling one-shot retry after transient error",
);
} else {
// Permanent error or max retries exhausted: disable.
// Note: deleteAfterRun:true only triggers on ok (see shouldDelete above),
// so exhausted-retry jobs are disabled but intentionally kept in the store
// to preserve the error state for inspection.
job.enabled = false;
job.state.nextRunAtMs = undefined;
state.deps.log.warn(
{
jobId: job.id,
jobName: job.name,
consecutiveErrors: consecutive,
error: result.error,
reason: retryHint.retryable ? "max retries exhausted" : "permanent error",
retryCategory: retryHint.category,
},
"cron: disabling one-shot job after error",
);
}
}
} else if (result.status === "error" && isJobEnabled(job)) {
// Apply exponential backoff for errored jobs to prevent retry storms.
const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1);
let normalNext: number | undefined;
try {
normalNext =
opts?.preserveSchedule && job.schedule.kind === "every"
? computeNextWithPreservedLastRun(result.endedAt)
: computeJobNextRunAtMs(job, result.endedAt);
} catch (err) {
// If the schedule expression/timezone throws (croner edge cases),
// record the schedule error (auto-disables after repeated failures)
// and fall back to backoff-only schedule so the state update is not lost.
recordScheduleComputeError({ state, job, err });
}
const backoffNext = result.endedAt + backoff;
// Use whichever is later: the natural next run or the backoff delay.
job.state.nextRunAtMs =
job.schedule.kind === "cron"
? resolveCronNextRunWithLowerBound({
state,
job,
naturalNext: normalNext,
lowerBoundMs: backoffNext,
context: "error_backoff",
})
: normalNext !== undefined
? Math.max(normalNext, backoffNext)
: backoffNext;
state.deps.log.info(
{
jobId: job.id,
consecutiveErrors: job.state.consecutiveErrors,
backoffMs: backoff,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: applying error backoff",
);
} else if (isJobEnabled(job)) {
let naturalNext: number | undefined;
try {
naturalNext =
opts?.preserveSchedule && job.schedule.kind === "every"
? computeNextWithPreservedLastRun(result.endedAt)
: computeJobNextRunAtMs(job, result.endedAt);
} catch (err) {
// If the schedule expression/timezone throws (croner edge cases),
// record the schedule error (auto-disables after repeated failures)
// so a persistent throw doesn't cause a MIN_REFIRE_GAP_MS hot loop.
recordScheduleComputeError({ state, job, err });
}
if (job.schedule.kind === "cron") {
// Safety net: ensure the next fire is at least MIN_REFIRE_GAP_MS
// after the current run ended. Prevents spin-loops when the
// schedule computation lands in the same second due to
// timezone/croner edge cases (see #17821).
const minNext = result.endedAt + MIN_REFIRE_GAP_MS;
job.state.nextRunAtMs = resolveCronNextRunWithLowerBound({
state,
job,
naturalNext,
lowerBoundMs: minNext,
context: "completion",
});
} else {
job.state.nextRunAtMs = naturalNext;
}
} else {
job.state.nextRunAtMs = undefined;
}
}
return shouldDelete;
}
function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void {
clearCronJobActive(result.jobId);
tryFinishCronTaskRun(state, result);
const store = state.store;
if (!store) {
return;
}
const jobs = store.jobs;
const job = jobs.find((entry) => entry.id === result.jobId);
if (!job) {
if (result.status === "ok") {
applyJobResult(state, result.job, {
status: result.status,
error: result.error,
diagnostics: result.diagnostics,
delivered: result.delivered,
provider: result.provider,
startedAt: result.startedAt,
endedAt: result.endedAt,
});
emitJobFinished(state, result.job, result, result.startedAt);
state.deps.log.info(
{ jobId: result.jobId },
"cron: finalized successful run after job was removed during execution",
);
return;
}
state.deps.log.warn(
{ jobId: result.jobId },
"cron: applyOutcomeToStoredJob — job not found after forceReload, result discarded",
);
return;
}
const shouldDelete = applyJobResult(state, job, {
status: result.status,
error: result.error,
diagnostics: result.diagnostics,
delivered: result.delivered,
provider: result.provider,
startedAt: result.startedAt,
endedAt: result.endedAt,
});
emitJobFinished(state, job, result, result.startedAt);
if (shouldDelete) {
store.jobs = jobs.filter((entry) => entry.id !== job.id);
emit(state, { jobId: job.id, action: "removed", job });
}
}
export function armTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = null;
if (!state.deps.cronEnabled) {
state.deps.log.debug({}, "cron: armTimer skipped - scheduler disabled");
return;
}
const nextAt = nextWakeAtMs(state);
if (!nextAt) {
const jobCount = state.store?.jobs.length ?? 0;
const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0;
const withNextRun =
state.store?.jobs.filter((j) => j.enabled && hasScheduledNextRunAtMs(j.state.nextRunAtMs))
.length ?? 0;
if (enabledCount > 0) {
armRunningRecheckTimer(state);
state.deps.log.debug(
{ jobCount, enabledCount, withNextRun, delayMs: MAX_TIMER_DELAY_MS },
"cron: timer armed for maintenance recheck",
);
return;
}
state.deps.log.debug(
{ jobCount, enabledCount, withNextRun },
"cron: armTimer skipped - no jobs with nextRunAtMs",
);
return;
}
const now = state.deps.nowMs();
const delay = Math.max(nextAt - now, 0);
// Floor: when the next wake time is in the past (delay === 0), enforce a
// minimum delay to prevent a tight setTimeout(0) loop. This can happen
// when a job has a stuck runningAtMs marker and a past-due nextRunAtMs:
// findDueJobs skips the job (blocked by runningAtMs), while
// recomputeNextRunsForMaintenance intentionally does not advance the
// past-due nextRunAtMs (per #13992). The finally block in onTimer then
// re-invokes armTimer with delay === 0, creating an infinite hot-loop
// that saturates the event loop and fills the log file to its size cap.
const flooredDelay = delay === 0 ? MIN_REFIRE_GAP_MS : delay;
// Wake at least once a minute to avoid schedule drift and recover quickly
// when the process was paused or wall-clock time jumps.
const clampedDelay = Math.min(flooredDelay, MAX_TIMER_DELAY_MS);
// Intentionally avoid an `async` timer callback:
// Vitest's fake-timer helpers can await async callbacks, which would block
// tests that simulate long-running jobs. Runtime behavior is unchanged.
state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
});
}, clampedDelay);
state.deps.log.debug(
{ nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS },
"cron: timer armed",
);
}
function armRunningRecheckTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
});
}, MAX_TIMER_DELAY_MS);
}
export async function onTimer(state: CronServiceState) {
if (state.running) {
// Re-arm the timer so the scheduler keeps ticking even when a job is
// still executing. Without this, a long-running job (e.g. an agentTurn
// exceeding MAX_TIMER_DELAY_MS) causes the clamped 60 s timer to fire
// while `running` is true. The early return then leaves no timer set,
// silently killing the scheduler until the next gateway restart.
//
// We use MAX_TIMER_DELAY_MS as a fixed re-check interval to avoid a
// zero-delay hot-loop when past-due jobs are waiting for the current
// execution to finish.
// See: https://github.com/openclaw/openclaw/issues/12025
armRunningRecheckTimer(state);
return;
}
state.running = true;
// Keep a watchdog timer armed while a tick is executing. If execution hangs
// (for example in a provider call), the scheduler still wakes to re-check.
armRunningRecheckTimer(state);
try {
const dueJobs = await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const dueCheckNow = state.deps.nowMs();
const due = collectRunnableJobs(state, dueCheckNow);
if (due.length === 0) {
// Use maintenance-only recompute to avoid advancing past-due nextRunAtMs
// values without execution. This prevents jobs from being silently skipped
// when the timer wakes up but findDueJobs returns empty (see #13992).
const changed = recomputeNextRunsForMaintenance(state, {
recomputeExpired: true,
nowMs: dueCheckNow,
});
if (changed) {
await persist(state);
}
return [];
}
const now = state.deps.nowMs();
for (const job of due) {
job.state.runningAtMs = now;
job.state.lastError = undefined;
}
await persist(state);
return due.map((j) => ({
id: j.id,
job: j,
}));
});
const runDueJob = async (params: {
id: string;
job: CronJob;
}): Promise<TimedCronRunOutcome> => {
const { id, job } = params;
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
markCronJobActive(job.id);
emit(state, { jobId: job.id, action: "started", job, runAtMs: startedAt });
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
const taskRunId = tryCreateCronTaskRun({ state, job, startedAt });
try {
const result = await executeJobCoreWithTimeout(state, job);
return {
jobId: id,
job,
taskRunId,
...result,
startedAt,
endedAt: state.deps.nowMs(),
};
} catch (err) {
const errorText = normalizeCronRunErrorText(err);
state.deps.log.warn(
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
`cron: job failed: ${errorText}`,
);
return {
jobId: id,
job,
taskRunId,
status: "error",
error: errorText,
diagnostics: createCronRunDiagnosticsFromError("cron-setup", errorText, {
nowMs: state.deps.nowMs,
}),
startedAt,
endedAt: state.deps.nowMs(),
};
}
};
const concurrency = Math.min(resolveRunConcurrency(state), Math.max(1, dueJobs.length));
const results: (TimedCronRunOutcome | undefined)[] = Array.from({ length: dueJobs.length });
let cursor = 0;
const workers = Array.from({ length: concurrency }, async () => {
for (;;) {
const index = cursor++;
if (index >= dueJobs.length) {
return;
}
const due = dueJobs[index];
if (!due) {
return;
}
results[index] = await runDueJob(due);
}
});
await Promise.all(workers);
const completedResults: TimedCronRunOutcome[] = results.filter(
(entry): entry is TimedCronRunOutcome => entry !== undefined,
);
if (completedResults.length > 0) {
await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
for (const result of completedResults) {
applyOutcomeToStoredJob(state, result);
}
// Use maintenance-only recompute to avoid advancing past-due
// nextRunAtMs values that became due between findDueJobs and this
// locked block. The full recomputeNextRuns would silently skip
// those jobs (advancing nextRunAtMs without execution), causing
// daily cron schedules to jump 48 h instead of 24 h (#17852).
recomputeNextRunsForMaintenance(state);
await persist(state);
});
}
} finally {
state.running = false;
armTimer(state);
}
}
function isRunnableJob(params: {
job: CronJob;
nowMs: number;
skipJobIds?: ReadonlySet<string>;
skipAtIfAlreadyRan?: boolean;
allowCronMissedRunByLastRun?: boolean;
}): boolean {
const { job, nowMs } = params;
if (!job.state) {
job.state = {};
}
if (!isJobEnabled(job)) {
return false;
}
if (params.skipJobIds?.has(job.id)) {
return false;
}
if (typeof job.state.runningAtMs === "number") {
return false;
}
if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) {
// One-shot with terminal status: skip unless it's a transient-error retry.
// Retries have nextRunAtMs > lastRunAtMs (scheduled after the failed run) (#24355).
// ok/skipped or error-without-retry always skip (#13845).
const lastRun = job.state.lastRunAtMs;
const nextRun = job.state.nextRunAtMs;
if (
job.state.lastStatus === "error" &&
isJobEnabled(job) &&
typeof nextRun === "number" &&
typeof lastRun === "number" &&
nextRun > lastRun
) {
return nowMs >= nextRun;
}
return false;
}
const next = job.state.nextRunAtMs;
if (hasScheduledNextRunAtMs(next) && nowMs >= next) {
return true;
}
if (hasScheduledNextRunAtMs(next) && next > nowMs && isErrorBackoffPending(job, nowMs)) {
// Respect active retry backoff windows on restart, but allow missed-slot
// replay once the backoff window has elapsed.
return false;
}
if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") {
return false;
}
let previousRunAtMs: number | undefined;
try {
previousRunAtMs = computeJobPreviousRunAtMs(job, nowMs);
} catch {
return false;
}
if (typeof previousRunAtMs !== "number" || !Number.isFinite(previousRunAtMs)) {
return false;
}
const lastRunAtMs = job.state.lastRunAtMs;
if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) {
// Only replay a "missed slot" when there is concrete run history.
return false;
}
return previousRunAtMs > lastRunAtMs;
}
function isErrorBackoffPending(job: CronJob, nowMs: number): boolean {
if (job.schedule.kind === "at" || job.state.lastStatus !== "error") {
return false;
}
const lastRunAtMs = job.state.lastRunAtMs;
if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) {
return false;
}
const consecutiveErrorsRaw = job.state.consecutiveErrors;
const consecutiveErrors =
typeof consecutiveErrorsRaw === "number" && Number.isFinite(consecutiveErrorsRaw)
? Math.max(1, Math.floor(consecutiveErrorsRaw))
: 1;
return nowMs < lastRunAtMs + errorBackoffMs(consecutiveErrors);
}
function collectRunnableJobs(
state: CronServiceState,
nowMs: number,
opts?: {
skipJobIds?: ReadonlySet<string>;
skipAtIfAlreadyRan?: boolean;
allowCronMissedRunByLastRun?: boolean;
},
): CronJob[] {
if (!state.store) {
return [];
}
return state.store.jobs.filter((job) =>
isRunnableJob({
job,
nowMs,
skipJobIds: opts?.skipJobIds,
skipAtIfAlreadyRan: opts?.skipAtIfAlreadyRan,
allowCronMissedRunByLastRun: opts?.allowCronMissedRunByLastRun,
}),
);
}
export async function runMissedJobs(
state: CronServiceState,
opts?: { skipJobIds?: ReadonlySet<string>; deferAgentTurnJobs?: boolean },
) {
const plan = await planStartupCatchup(state, opts);
if (plan.candidates.length === 0 && plan.deferredJobs.length === 0) {
return;
}
const outcomes = await executeStartupCatchupPlan(state, plan);
await applyStartupCatchupOutcomes(state, plan, outcomes);
}
async function planStartupCatchup(
state: CronServiceState,
opts?: { skipJobIds?: ReadonlySet<string>; deferAgentTurnJobs?: boolean },
): Promise<StartupCatchupPlan> {
const maxImmediate = Math.max(
0,
state.deps.maxMissedJobsPerRestart ?? DEFAULT_MAX_MISSED_JOBS_PER_RESTART,
);
return locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });
if (!state.store) {
return { candidates: [], deferredJobs: [] };
}
const now = state.deps.nowMs();
const missed = collectRunnableJobs(state, now, {
skipJobIds: opts?.skipJobIds,
skipAtIfAlreadyRan: true,
allowCronMissedRunByLastRun: true,
});
if (missed.length === 0) {
return { candidates: [], deferredJobs: [] };
}
const sorted = missed.toSorted(
(a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0),
);
const deferredAgentJobs = opts?.deferAgentTurnJobs
? sorted.filter((job) => job.payload.kind === "agentTurn")
: [];
const startupEligible = opts?.deferAgentTurnJobs
? sorted.filter((job) => job.payload.kind !== "agentTurn")
: sorted;
const startupCandidates = startupEligible.slice(0, maxImmediate);
const deferredOverflow = startupEligible.slice(maxImmediate);
const deferredAgentDelayMs = Math.max(
0,
state.deps.startupDeferredMissedAgentJobDelayMs ??
DEFAULT_STARTUP_DEFERRED_MISSED_AGENT_JOB_DELAY_MS,
);
const deferred: StartupDeferredJob[] = [
...deferredOverflow.map((job) => ({ jobId: job.id })),
...deferredAgentJobs.map((job) => ({ jobId: job.id, delayMs: deferredAgentDelayMs })),
];
if (deferred.length > 0) {
state.deps.log.info(
{
immediateCount: startupCandidates.length,
deferredCount: deferred.length,
totalMissed: missed.length,
},
"cron: staggering missed jobs to prevent gateway overload",
);
}
if (deferredAgentJobs.length > 0) {
state.deps.log.info(
{
count: deferredAgentJobs.length,
jobIds: deferredAgentJobs.map((job) => job.id),
delayMs: deferredAgentDelayMs,
},
"cron: deferring missed agent jobs until after gateway startup",
);
}
if (startupCandidates.length > 0) {
state.deps.log.info(
{ count: startupCandidates.length, jobIds: startupCandidates.map((j) => j.id) },
"cron: running missed jobs after restart",
);
}
for (const job of startupCandidates) {
job.state.runningAtMs = now;
job.state.lastError = undefined;
}
await persist(state);
return {
candidates: startupCandidates.map((job) => ({ jobId: job.id, job })),
deferredJobs: deferred,
};
});
}
async function executeStartupCatchupPlan(
state: CronServiceState,
plan: StartupCatchupPlan,
): Promise<TimedCronRunOutcome[]> {
const outcomes: TimedCronRunOutcome[] = [];
for (const candidate of plan.candidates) {
outcomes.push(await runStartupCatchupCandidate(state, candidate));
}
return outcomes;
}
async function runStartupCatchupCandidate(
state: CronServiceState,
candidate: StartupCatchupCandidate,
): Promise<TimedCronRunOutcome> {
const startedAt = state.deps.nowMs();
const taskRunId = tryCreateCronTaskRun({
state,
job: candidate.job,
startedAt,
});
emit(state, {
jobId: candidate.job.id,
action: "started",
job: candidate.job,
runAtMs: startedAt,
});
try {
const result = await executeJobCoreWithTimeout(state, candidate.job);
return {
jobId: candidate.jobId,
job: candidate.job,
taskRunId,
status: result.status,
error: result.error,
summary: result.summary,
diagnostics: result.diagnostics,
delivered: result.delivered,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
model: result.model,
provider: result.provider,
usage: result.usage,
startedAt,
endedAt: state.deps.nowMs(),
};
} catch (err) {
return {
jobId: candidate.jobId,
job: candidate.job,
taskRunId,
status: "error",
error: normalizeCronRunErrorText(err),
diagnostics: createCronRunDiagnosticsFromError("cron-setup", normalizeCronRunErrorText(err), {
nowMs: state.deps.nowMs,
}),
startedAt,
endedAt: state.deps.nowMs(),
};
}
}
async function applyStartupCatchupOutcomes(
state: CronServiceState,
plan: StartupCatchupPlan,
outcomes: TimedCronRunOutcome[],
): Promise<void> {
const staggerMs = Math.max(0, state.deps.missedJobStaggerMs ?? DEFAULT_MISSED_JOB_STAGGER_MS);
await locked(state, async () => {
// Startup catch-up runs during service bootstrap, before the timer loop is
// armed. Reuse the in-memory store instead of forcing a second reload.
await ensureLoaded(state, { skipRecompute: true });
if (!state.store) {
return;
}
for (const result of outcomes) {
applyOutcomeToStoredJob(state, result);
}
if (plan.deferredJobs.length > 0) {
const baseNow = state.deps.nowMs();
let offset = staggerMs;
for (const deferred of plan.deferredJobs) {
const jobId = deferred.jobId;
const job = state.store.jobs.find((entry) => entry.id === jobId);
if (!job || !isJobEnabled(job)) {
continue;
}
if (typeof deferred.delayMs === "number") {
job.state.nextRunAtMs = baseNow + deferred.delayMs + offset - staggerMs;
offset += staggerMs;
continue;
}
job.state.nextRunAtMs = baseNow + offset;
offset += staggerMs;
}
}
// Preserve any new past-due nextRunAtMs values that became due while
// startup catch-up was running. They should execute on a future tick
// instead of being silently advanced. Future repair is disabled here so
// startup overflow deferrals survive until their staggered catch-up tick.
recomputeNextRunsForMaintenance(state, { repairFutureCronNextRunAtMs: false });
await persist(state);
});
}
export async function executeJobCore(
state: CronServiceState,
job: CronJob,
abortSignal?: AbortSignal,
options?: {
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
onExecutionPhase?: (info: CronAgentExecutionPhaseUpdate) => void;
},
): Promise<
CronRunOutcome &
CronRunTelemetry & {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}
> {
const resolveAbortError = () => ({
status: "error" as const,
error: abortErrorMessage(abortSignal),
});
const waitWithAbort = async (ms: number) => {
if (!abortSignal) {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
return;
}
if (abortSignal.aborted) {
return;
}
await new Promise<void>((resolve) => {
const timer = setTimeout(() => {
abortSignal.removeEventListener("abort", onAbort);
resolve();
}, ms);
const onAbort = () => {
clearTimeout(timer);
abortSignal.removeEventListener("abort", onAbort);
resolve();
};
abortSignal.addEventListener("abort", onAbort, { once: true });
});
};
if (abortSignal?.aborted) {
return resolveAbortError();
}
if (job.sessionTarget === "main") {
return await executeMainSessionCronJob(state, job, abortSignal, waitWithAbort);
}
return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError, options);
}
async function executeMainSessionCronJob(
state: CronServiceState,
job: CronJob,
abortSignal: AbortSignal | undefined,
waitWithAbort: (ms: number) => Promise<void>,
): Promise<
CronRunOutcome &
CronRunTelemetry & {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}
> {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
return {
status: "skipped",
error:
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
};
}
const targetMainSessionKey = job.sessionKey;
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: targetMainSessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000;
const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250;
const waitStartedAt = state.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
heartbeatResult = await state.deps.runHeartbeatOnce({
source: "cron",
intent: "immediate",
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
heartbeat: { target: "last" },
});
if (
heartbeatResult.status !== "skipped" ||
!isRetryableHeartbeatBusySkipReason(heartbeatResult.reason)
) {
break;
}
if (heartbeatResult.reason === HEARTBEAT_SKIP_CRON_IN_PROGRESS) {
// The active cron marker blocks direct wake-now until this job returns.
state.deps.requestHeartbeat({
source: "cron",
intent: "immediate",
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
heartbeat: { target: "last" },
});
return { status: "ok", summary: text };
}
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
state.deps.requestHeartbeat({
source: "cron",
intent: "immediate",
reason,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
heartbeat: { target: "last" },
});
return { status: "ok", summary: text };
}
await waitWithAbort(retryDelayMs);
}
if (heartbeatResult.status === "ran") {
return { status: "ok", summary: text };
}
if (heartbeatResult.status === "skipped") {
return { status: "skipped", error: heartbeatResult.reason, summary: text };
}
return { status: "error", error: heartbeatResult.reason, summary: text };
}
if (abortSignal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
state.deps.requestHeartbeat({
source: "cron",
intent: job.wakeMode === "now" ? "immediate" : "event",
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: targetMainSessionKey,
heartbeat: { target: "last" },
});
return { status: "ok", summary: text };
}
async function executeDetachedCronJob(
state: CronServiceState,
job: CronJob,
abortSignal: AbortSignal | undefined,
resolveAbortError: () => { status: "error"; error: string },
options?: {
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
onExecutionPhase?: (info: CronAgentExecutionPhaseUpdate) => void;
},
): Promise<
CronRunOutcome &
CronRunTelemetry & {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}
> {
if (job.payload.kind !== "agentTurn") {
const error = "isolated job requires payload.kind=agentTurn";
return {
status: "skipped",
error,
diagnostics: createCronRunDiagnosticsFromError("cron-preflight", error, {
severity: "warn",
nowMs: state.deps.nowMs,
}),
};
}
if (abortSignal?.aborted) {
const aborted = resolveAbortError();
return {
...aborted,
diagnostics: createCronRunDiagnosticsFromError("cron-setup", aborted.error, {
nowMs: state.deps.nowMs,
}),
};
}
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
abortSignal,
onExecutionStarted: options?.onExecutionStarted,
onExecutionPhase: options?.onExecutionPhase,
});
if (abortSignal?.aborted) {
const error = abortErrorMessage(abortSignal);
return {
status: "error",
error,
diagnostics: createCronRunDiagnosticsFromError("cron-setup", error, {
nowMs: state.deps.nowMs,
}),
};
}
return {
status: res.status,
error: res.error,
summary: res.summary,
delivered: res.delivered,
deliveryAttempted: res.deliveryAttempted,
delivery: res.delivery,
sessionId: res.sessionId,
sessionKey: res.sessionKey,
diagnostics: res.diagnostics,
model: res.model,
provider: res.provider,
usage: res.usage,
};
}
/**
* Execute a job. This version is used by the `run` command and other
* places that need the full execution with state updates.
*/
export async function executeJob(
state: CronServiceState,
job: CronJob,
_nowMs: number,
_opts: { forced: boolean },
) {
if (!job.state) {
job.state = {};
}
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
job.state.lastError = undefined;
markCronJobActive(job.id);
emit(state, { jobId: job.id, action: "started", job, runAtMs: startedAt });
let coreResult: {
status: CronRunStatus;
delivered?: boolean;
delivery?: CronDeliveryTrace;
} & CronRunOutcome &
CronRunTelemetry;
try {
coreResult = await executeJobCoreWithTimeout(state, job);
} catch (err) {
coreResult = { status: "error", error: String(err) };
}
const endedAt = state.deps.nowMs();
const shouldDelete = applyJobResult(state, job, {
status: coreResult.status,
error: coreResult.error,
diagnostics: coreResult.diagnostics,
delivered: coreResult.delivered,
provider: coreResult.provider,
startedAt,
endedAt,
});
emitJobFinished(state, job, coreResult, startedAt);
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
emit(state, { jobId: job.id, action: "removed", job });
}
clearCronJobActive(job.id);
}
function emitJobFinished(
state: CronServiceState,
job: CronJob,
result: {
status: CronRunStatus;
delivered?: boolean;
delivery?: CronDeliveryTrace;
} & CronRunOutcome &
CronRunTelemetry,
runAtMs: number,
) {
emit(state, {
jobId: job.id,
action: "finished",
job,
status: result.status,
error: result.error,
summary: result.summary,
diagnostics: result.diagnostics,
delivered: job.state.lastDelivered,
deliveryStatus: job.state.lastDeliveryStatus,
deliveryError: job.state.lastDeliveryError,
failureNotificationDelivery: failureNotificationDeliveryFromJobState(job),
delivery: result.delivery,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
runAtMs,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
model: result.model,
provider: result.provider,
usage: result.usage,
});
}
export function wake(
state: CronServiceState,
opts: { mode: "now" | "next-heartbeat"; text: string; sessionKey?: string },
) {
const text = opts.text.trim();
if (!text) {
return { ok: false } as const;
}
const sessionKey = opts.sessionKey?.trim() || undefined;
if (sessionKey && isSubagentSessionKey(sessionKey)) {
return { ok: false, reason: "unwakeable-session-key" } as const;
}
state.deps.enqueueSystemEvent(text, sessionKey ? { sessionKey } : undefined);
if (opts.mode === "now") {
state.deps.requestHeartbeat({
source: "manual",
intent: "immediate",
reason: "wake",
...(sessionKey ? { sessionKey } : {}),
});
}
return { ok: true } as const;
}
export function stopTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = null;
}
export function emit(state: CronServiceState, evt: CronEvent) {
try {
state.deps.onEvent?.(evt);
} catch {
/* ignore */
}
}