fix: harden cron validation and restart state

This commit is contained in:
Peter Steinberger
2026-05-31 21:25:47 -04:00
parent cc97eca9b1
commit d2f1c0eac8
5 changed files with 261 additions and 18 deletions

View File

@@ -163,6 +163,41 @@ describe("CronService restart catch-up", () => {
);
});
it("does not replay completed one-shot jobs restored with lastRunStatus only", async () => {
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
await withRestartedCron(
[
{
id: "restart-one-shot-last-run-status",
name: "finished one shot",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: dueAt,
schedule: { kind: "at", at: "2025-12-13T16:00:00.000Z" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "do not replay one shot" },
state: {
nextRunAtMs: dueAt,
lastRunAtMs: dueAt,
lastRunStatus: "ok",
},
},
],
async ({ cron, enqueueSystemEvent, requestHeartbeat }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeat).not.toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-one-shot-last-run-status");
expect(updated?.state.nextRunAtMs).toBeUndefined();
expect(updated?.state.lastRunStatus).toBe("ok");
expect(updated?.state.lastStatus).toBeUndefined();
},
);
});
it("defers overdue isolated agent-turn jobs during gateway startup", async () => {
const store = await makeStorePath();
const startNow = Date.parse("2025-12-13T17:00:00.000Z");
@@ -469,6 +504,46 @@ describe("CronService restart catch-up", () => {
);
});
it("keeps past-due retries paused when restored with lastRunStatus only", async () => {
vi.setSystemTime(new Date("2025-12-13T17:00:00.000Z"));
await withRestartedCron(
[
{
id: "restart-backoff-last-run-status",
name: "lastRunStatus backoff pending",
enabled: true,
createdAtMs: Date.parse("2025-12-13T16:50:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T16:59:45.000Z"),
schedule: {
kind: "every",
everyMs: 60_000,
anchorMs: Date.parse("2025-12-13T16:50:00.000Z"),
},
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "do not run during lastRunStatus backoff" },
state: {
nextRunAtMs: Date.parse("2025-12-13T16:59:50.000Z"),
lastRunAtMs: Date.parse("2025-12-13T16:59:45.000Z"),
lastDurationMs: 0,
lastRunStatus: "error",
consecutiveErrors: 1,
},
},
],
async ({ cron, enqueueSystemEvent, requestHeartbeat }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeat).not.toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-backoff-last-run-status");
expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z"));
expect(updated?.state.lastRunStatus).toBe("error");
expect(updated?.state.lastStatus).toBeUndefined();
},
);
});
it("replays missed cron slot after restart when error backoff has already elapsed", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
await withRestartedCron(

View File

@@ -54,6 +54,10 @@ export function hasScheduledNextRunAtMs(value: unknown): value is number {
return isFiniteTimestamp(value) && value > 0;
}
export function resolveJobLastRunStatus(job: Pick<CronJob, "state">) {
return job.state.lastRunStatus ?? job.state.lastStatus;
}
export function errorBackoffMs(
consecutiveErrors: number,
scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
@@ -66,7 +70,7 @@ export function resolveJobErrorBackoffUntilMs(
job: CronJob,
scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS,
): number | undefined {
if (job.state.lastStatus !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) {
if (resolveJobLastRunStatus(job) !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) {
return undefined;
}
const consecutiveErrorsRaw = job.state.consecutiveErrors;
@@ -403,7 +407,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
const atMs = parseAbsoluteTimeMs(job.schedule.at);
// One-shot jobs stay due until they successfully finish, but if the
// schedule was updated to a time after the last run, re-arm the job.
if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) {
if (resolveJobLastRunStatus(job) === "ok" && job.state.lastRunAtMs) {
if (atMs !== null && Number.isFinite(atMs) && atMs > job.state.lastRunAtMs) {
return atMs;
}
@@ -565,7 +569,7 @@ function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob
let newNext = computeJobNextRunAtMs(params.job, params.nowMs);
if (
params.job.schedule.kind !== "at" &&
params.job.state.lastStatus === "error" &&
resolveJobLastRunStatus(params.job) === "error" &&
isFiniteTimestamp(params.job.state.lastRunAtMs)
) {
const backoffFloor = resolveJobErrorBackoffUntilMs(

View File

@@ -57,6 +57,7 @@ import {
recomputeNextRunsForMaintenance,
recordScheduleComputeError,
resolveJobErrorBackoffUntilMs,
resolveJobLastRunStatus,
resolveJobPayloadTextForMain,
} from "./jobs.js";
import { locked } from "./locked.js";
@@ -968,14 +969,15 @@ function isRunnableJob(params: {
if (typeof job.state.runningAtMs === "number") {
return false;
}
if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) {
const lastRunStatus = resolveJobLastRunStatus(job);
if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && lastRunStatus) {
// One-shot with terminal status: skip unless it's a transient-error retry.
// Retries have nextRunAtMs > lastRunAtMs (scheduled after the failed run) (#24355).
// ok/skipped or error-without-retry always skip (#13845).
const lastRun = job.state.lastRunAtMs;
const nextRun = job.state.nextRunAtMs;
if (
job.state.lastStatus === "error" &&
lastRunStatus === "error" &&
isJobEnabled(job) &&
typeof nextRun === "number" &&
typeof lastRun === "number" &&
@@ -1015,7 +1017,7 @@ function isRunnableJob(params: {
}
function isErrorBackoffPending(state: CronServiceState, job: CronJob, nowMs: number): boolean {
if (job.schedule.kind === "at" || job.state.lastStatus !== "error") {
if (job.schedule.kind === "at" || resolveJobLastRunStatus(job) !== "error") {
return false;
}
const backoffUntilMs = resolveJobErrorBackoffUntilMs(

View File

@@ -160,18 +160,14 @@ function assertValidCronCreateDelivery(cfg: OpenClawConfig, jobCreate: CronJobCr
});
}
function assertValidCronUpdateDelivery(params: {
function assertValidCronUpdatePatch(params: {
cfg: OpenClawConfig;
defaultAgentId?: string;
currentJob: CronJob | undefined;
currentJob: CronJob;
patch: CronJobPatch;
}) {
if (!params.currentJob || !("delivery" in params.patch)) {
return;
}
// Validate the post-patch job, not just the sparse patch, because delivery
// fields can be split across the existing job and the update payload.
// and payload/session fields can be split across existing state and patch.
const nextJob = structuredClone(params.currentJob);
applyJobPatch(nextJob, params.patch, {
defaultAgentId: params.defaultAgentId,
@@ -212,6 +208,25 @@ function cronRunLogPageFilters(params: CronRunsRequestParams) {
};
}
function isCronInvalidRequestError(err: unknown): boolean {
const message = formatErrorMessage(err);
return (
message.startsWith("unknown cron job id:") ||
message.includes("cron job is missing sessionTarget") ||
message.includes("invalid cron sessionTarget session id") ||
message.includes('main cron jobs require payload.kind="systemEvent"') ||
message.includes('isolated/current/session cron jobs require payload.kind="agentTurn"') ||
message.includes('sessionTarget "main" is only valid for the default agent') ||
message.includes('cron.update payload.kind="systemEvent" requires text') ||
message.includes('cron.update payload.kind="agentTurn" requires message') ||
message.includes("cron webhook delivery requires") ||
message.includes("cron completion destination webhook requires") ||
message.includes("cron failure destination webhook requires") ||
message.includes("cron channel delivery config is only supported") ||
message.includes("cron delivery.failureDestination is only supported")
);
}
export const cronHandlers: GatewayRequestHandlers = {
wake: ({ params, respond, context }) => {
if (!validateWakeParams(params)) {
@@ -392,7 +407,11 @@ export const cronHandlers: GatewayRequestHandlers = {
try {
job = await context.cron.add(jobCreate);
} catch (err) {
if (!(err instanceof TypeError) && !(err instanceof RangeError)) {
if (
!(err instanceof TypeError) &&
!(err instanceof RangeError) &&
!isCronInvalidRequestError(err)
) {
throw err;
}
respond(
@@ -454,6 +473,11 @@ export const cronHandlers: GatewayRequestHandlers = {
}
const patch = p.patch as unknown as CronJobPatch;
const cfg = context.getRuntimeConfig();
const currentJob = await context.cron.readJob(jobId);
if (!currentJob) {
respondInvalidCronParams(respond, "cron.update", "id not found");
return;
}
if (patch.schedule) {
const timestampValidation = validateScheduleTimestamp(patch.schedule);
if (!timestampValidation.ok) {
@@ -466,10 +490,10 @@ export const cronHandlers: GatewayRequestHandlers = {
}
}
try {
assertValidCronUpdateDelivery({
assertValidCronUpdatePatch({
cfg,
defaultAgentId: context.cron.getDefaultAgentId(),
currentJob: context.cron.getJob(jobId),
currentJob,
patch,
});
} catch (err) {
@@ -487,7 +511,11 @@ export const cronHandlers: GatewayRequestHandlers = {
try {
job = await context.cron.update(jobId, patch);
} catch (err) {
if (!(err instanceof TypeError) && !(err instanceof RangeError)) {
if (
!(err instanceof TypeError) &&
!(err instanceof RangeError) &&
!isCronInvalidRequestError(err)
) {
throw err;
}
respond(
@@ -552,6 +580,10 @@ export const cronHandlers: GatewayRequestHandlers = {
respond(true, { ok: true, ran: false, reason: "invalid-spec" }, undefined);
return;
}
if (isCronInvalidRequestError(error)) {
respondInvalidCronParams(respond, "cron.run", formatErrorMessage(error));
return;
}
throw error;
}
respond(true, result, undefined);

View File

@@ -76,6 +76,7 @@ function createCronContext(currentJob?: CronJob) {
add: vi.fn(async () => ({ id: "cron-1" })),
update: vi.fn(async () => ({ id: "cron-1" })),
remove: vi.fn(async () => ({ ok: true, removed: true })),
enqueueRun: vi.fn(async () => ({ ok: true, enqueued: true, runId: "run-1" })),
getDefaultAgentId: vi.fn(() => "main"),
getJob: vi.fn(() => currentJob),
wake: vi.fn(() => ({ ok: true }) as const),
@@ -116,7 +117,7 @@ async function invokeCronGet(params: Record<string, unknown>, currentJob?: CronJ
return await invokeCron("cron.get", params, { currentJob });
}
async function invokeCronUpdate(params: Record<string, unknown>, currentJob: CronJob) {
async function invokeCronUpdate(params: Record<string, unknown>, currentJob?: CronJob) {
return await invokeCron("cron.update", params, { currentJob });
}
@@ -562,6 +563,51 @@ describe("cron method validation", () => {
expectResponseError(respond, { messageIncludes: "delivery.channel is required" });
});
it("loads the cron job before validating update delivery patches", async () => {
getRuntimeConfig.mockReturnValue({
session: {
mainKey: "main",
},
channels: {
telegram: {
botToken: "telegram-token",
},
slack: {
botToken: "xoxb-slack-token",
appToken: "xapp-slack-token",
},
},
plugins: {
entries: {
telegram: { enabled: true },
slack: { enabled: true },
},
},
} as OpenClawConfig);
const context = createCronContext(createCronJob());
context.cron.getJob.mockReturnValue(undefined);
const respond = vi.fn();
await cronHandlers["cron.update"]({
req: {} as never,
params: {
id: "cron-1",
patch: {
delivery: { mode: "announce" },
},
} as never,
respond: respond as never,
context: context as never,
client: null,
isWebchatConnect: () => false,
});
expect(context.cron.readJob).toHaveBeenCalledWith("cron-1");
expect(context.cron.getJob).not.toHaveBeenCalled();
expect(context.cron.update).not.toHaveBeenCalled();
expectResponseError(respond, { messageIncludes: "delivery.channel is required" });
});
it("rejects target ids mistakenly supplied as delivery.channel providers", async () => {
setRuntimeConfig({
session: {
@@ -610,6 +656,37 @@ describe("cron method validation", () => {
expectResponseError(respond, { code: "INVALID_REQUEST", messageIncludes: "CronPattern" });
});
it("returns INVALID_REQUEST when cron.add rejects an incompatible main agent", async () => {
const context = createCronContext();
context.cron.add.mockRejectedValueOnce(
new Error(
'cron: sessionTarget "main" is only valid for the default agent. Use sessionTarget "isolated" with payload.kind "agentTurn" for non-default agents (agentId: worker)',
),
);
const respond = vi.fn();
await cronHandlers["cron.add"]({
req: {} as never,
params: {
name: "bad-main-agent",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "ping" },
agentId: "worker",
} as never,
respond: respond as never,
context: context as never,
client: null,
isWebchatConnect: () => false,
});
expectResponseError(respond, {
code: "INVALID_REQUEST",
messageIncludes: 'sessionTarget "main" is only valid',
});
});
it("returns INVALID_REQUEST when cron.update throws a croner parse error (#74066)", async () => {
const existingJob = createCronJob();
const context = createCronContext(existingJob);
@@ -630,6 +707,59 @@ describe("cron method validation", () => {
expectResponseError(respond, { code: "INVALID_REQUEST", messageIncludes: "CronPattern" });
});
it("returns INVALID_REQUEST when cron.update cannot find the job", async () => {
const { context, respond } = await invokeCronUpdate({
id: "missing",
patch: { enabled: false },
});
expect(context.cron.update).not.toHaveBeenCalled();
expectResponseError(respond, {
code: "INVALID_REQUEST",
messageIncludes: "invalid cron.update params: id not found",
});
});
it("rejects cron.update payload/session mismatches before calling the service update", async () => {
const { context, respond } = await invokeCronUpdate(
{
id: "cron-1",
patch: {
payload: { kind: "systemEvent", text: "wake main" },
},
},
createCronJob({
sessionTarget: "isolated",
payload: { kind: "agentTurn", message: "hello" },
}),
);
expect(context.cron.update).not.toHaveBeenCalled();
expectResponseError(respond, {
code: "INVALID_REQUEST",
messageIncludes: 'isolated/current/session cron jobs require payload.kind="agentTurn"',
});
});
it("returns INVALID_REQUEST when cron.run cannot find the job", async () => {
const context = createCronContext();
context.cron.enqueueRun.mockRejectedValueOnce(new Error("unknown cron job id: missing"));
const respond = vi.fn();
await cronHandlers["cron.run"]({
req: {} as never,
params: { id: "missing" } as never,
respond: respond as never,
context: context as never,
client: null,
isWebchatConnect: () => false,
});
expectResponseError(respond, {
code: "INVALID_REQUEST",
messageIncludes: "unknown cron job id: missing",
});
});
it("re-throws non-parse errors from cron.add instead of masking as INVALID_REQUEST", async () => {
const context = createCronContext();
context.cron.add.mockRejectedValueOnce(new Error("DB write failed"));