From d2f1c0eac80db106e8b4db7f75c2e7d3bdbc5f18 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 31 May 2026 21:25:47 -0400 Subject: [PATCH] fix: harden cron validation and restart state --- src/cron/service.restart-catchup.test.ts | 75 ++++++++++ src/cron/service/jobs.ts | 10 +- src/cron/service/timer.ts | 8 +- src/gateway/server-methods/cron.ts | 54 +++++-- .../server-methods/cron.validation.test.ts | 132 +++++++++++++++++- 5 files changed, 261 insertions(+), 18 deletions(-) diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts index bcb12bcabe3..49dc0f9ff7d 100644 --- a/src/cron/service.restart-catchup.test.ts +++ b/src/cron/service.restart-catchup.test.ts @@ -163,6 +163,41 @@ describe("CronService restart catch-up", () => { ); }); + it("does not replay completed one-shot jobs restored with lastRunStatus only", async () => { + const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); + + await withRestartedCron( + [ + { + id: "restart-one-shot-last-run-status", + name: "finished one shot", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: dueAt, + schedule: { kind: "at", at: "2025-12-13T16:00:00.000Z" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "do not replay one shot" }, + state: { + nextRunAtMs: dueAt, + lastRunAtMs: dueAt, + lastRunStatus: "ok", + }, + }, + ], + async ({ cron, enqueueSystemEvent, requestHeartbeat }) => { + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeat).not.toHaveBeenCalled(); + + const listedJobs = await cron.list({ includeDisabled: true }); + const updated = listedJobs.find((job) => job.id === "restart-one-shot-last-run-status"); + expect(updated?.state.nextRunAtMs).toBeUndefined(); + expect(updated?.state.lastRunStatus).toBe("ok"); + expect(updated?.state.lastStatus).toBeUndefined(); + }, + ); + }); + it("defers overdue isolated agent-turn jobs during gateway startup", async () => { const store = await makeStorePath(); const startNow = Date.parse("2025-12-13T17:00:00.000Z"); @@ -469,6 +504,46 @@ describe("CronService restart catch-up", () => { ); }); + it("keeps past-due retries paused when restored with lastRunStatus only", async () => { + vi.setSystemTime(new Date("2025-12-13T17:00:00.000Z")); + await withRestartedCron( + [ + { + id: "restart-backoff-last-run-status", + name: "lastRunStatus backoff pending", + enabled: true, + createdAtMs: Date.parse("2025-12-13T16:50:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T16:59:45.000Z"), + schedule: { + kind: "every", + everyMs: 60_000, + anchorMs: Date.parse("2025-12-13T16:50:00.000Z"), + }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "do not run during lastRunStatus backoff" }, + state: { + nextRunAtMs: Date.parse("2025-12-13T16:59:50.000Z"), + lastRunAtMs: Date.parse("2025-12-13T16:59:45.000Z"), + lastDurationMs: 0, + lastRunStatus: "error", + consecutiveErrors: 1, + }, + }, + ], + async ({ cron, enqueueSystemEvent, requestHeartbeat }) => { + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeat).not.toHaveBeenCalled(); + + const listedJobs = await cron.list({ includeDisabled: true }); + const updated = listedJobs.find((job) => job.id === "restart-backoff-last-run-status"); + expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z")); + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastStatus).toBeUndefined(); + }, + ); + }); + it("replays missed cron slot after restart when error backoff has already elapsed", async () => { vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); await withRestartedCron( diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 18926222843..2f3ff06aabc 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -54,6 +54,10 @@ export function hasScheduledNextRunAtMs(value: unknown): value is number { return isFiniteTimestamp(value) && value > 0; } +export function resolveJobLastRunStatus(job: Pick) { + return job.state.lastRunStatus ?? job.state.lastStatus; +} + export function errorBackoffMs( consecutiveErrors: number, scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, @@ -66,7 +70,7 @@ export function resolveJobErrorBackoffUntilMs( job: CronJob, scheduleMs = DEFAULT_ERROR_BACKOFF_SCHEDULE_MS, ): number | undefined { - if (job.state.lastStatus !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) { + if (resolveJobLastRunStatus(job) !== "error" || !isFiniteTimestamp(job.state.lastRunAtMs)) { return undefined; } const consecutiveErrorsRaw = job.state.consecutiveErrors; @@ -403,7 +407,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und const atMs = parseAbsoluteTimeMs(job.schedule.at); // One-shot jobs stay due until they successfully finish, but if the // schedule was updated to a time after the last run, re-arm the job. - if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) { + if (resolveJobLastRunStatus(job) === "ok" && job.state.lastRunAtMs) { if (atMs !== null && Number.isFinite(atMs) && atMs > job.state.lastRunAtMs) { return atMs; } @@ -565,7 +569,7 @@ function recomputeJobNextRunAtMs(params: { state: CronServiceState; job: CronJob let newNext = computeJobNextRunAtMs(params.job, params.nowMs); if ( params.job.schedule.kind !== "at" && - params.job.state.lastStatus === "error" && + resolveJobLastRunStatus(params.job) === "error" && isFiniteTimestamp(params.job.state.lastRunAtMs) ) { const backoffFloor = resolveJobErrorBackoffUntilMs( diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index b4dbe526c62..f7283f534cf 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -57,6 +57,7 @@ import { recomputeNextRunsForMaintenance, recordScheduleComputeError, resolveJobErrorBackoffUntilMs, + resolveJobLastRunStatus, resolveJobPayloadTextForMain, } from "./jobs.js"; import { locked } from "./locked.js"; @@ -968,14 +969,15 @@ function isRunnableJob(params: { if (typeof job.state.runningAtMs === "number") { return false; } - if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) { + const lastRunStatus = resolveJobLastRunStatus(job); + if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && lastRunStatus) { // One-shot with terminal status: skip unless it's a transient-error retry. // Retries have nextRunAtMs > lastRunAtMs (scheduled after the failed run) (#24355). // ok/skipped or error-without-retry always skip (#13845). const lastRun = job.state.lastRunAtMs; const nextRun = job.state.nextRunAtMs; if ( - job.state.lastStatus === "error" && + lastRunStatus === "error" && isJobEnabled(job) && typeof nextRun === "number" && typeof lastRun === "number" && @@ -1015,7 +1017,7 @@ function isRunnableJob(params: { } function isErrorBackoffPending(state: CronServiceState, job: CronJob, nowMs: number): boolean { - if (job.schedule.kind === "at" || job.state.lastStatus !== "error") { + if (job.schedule.kind === "at" || resolveJobLastRunStatus(job) !== "error") { return false; } const backoffUntilMs = resolveJobErrorBackoffUntilMs( diff --git a/src/gateway/server-methods/cron.ts b/src/gateway/server-methods/cron.ts index dd6d4924e95..28541f86157 100644 --- a/src/gateway/server-methods/cron.ts +++ b/src/gateway/server-methods/cron.ts @@ -160,18 +160,14 @@ function assertValidCronCreateDelivery(cfg: OpenClawConfig, jobCreate: CronJobCr }); } -function assertValidCronUpdateDelivery(params: { +function assertValidCronUpdatePatch(params: { cfg: OpenClawConfig; defaultAgentId?: string; - currentJob: CronJob | undefined; + currentJob: CronJob; patch: CronJobPatch; }) { - if (!params.currentJob || !("delivery" in params.patch)) { - return; - } - // Validate the post-patch job, not just the sparse patch, because delivery - // fields can be split across the existing job and the update payload. + // and payload/session fields can be split across existing state and patch. const nextJob = structuredClone(params.currentJob); applyJobPatch(nextJob, params.patch, { defaultAgentId: params.defaultAgentId, @@ -212,6 +208,25 @@ function cronRunLogPageFilters(params: CronRunsRequestParams) { }; } +function isCronInvalidRequestError(err: unknown): boolean { + const message = formatErrorMessage(err); + return ( + message.startsWith("unknown cron job id:") || + message.includes("cron job is missing sessionTarget") || + message.includes("invalid cron sessionTarget session id") || + message.includes('main cron jobs require payload.kind="systemEvent"') || + message.includes('isolated/current/session cron jobs require payload.kind="agentTurn"') || + message.includes('sessionTarget "main" is only valid for the default agent') || + message.includes('cron.update payload.kind="systemEvent" requires text') || + message.includes('cron.update payload.kind="agentTurn" requires message') || + message.includes("cron webhook delivery requires") || + message.includes("cron completion destination webhook requires") || + message.includes("cron failure destination webhook requires") || + message.includes("cron channel delivery config is only supported") || + message.includes("cron delivery.failureDestination is only supported") + ); +} + export const cronHandlers: GatewayRequestHandlers = { wake: ({ params, respond, context }) => { if (!validateWakeParams(params)) { @@ -392,7 +407,11 @@ export const cronHandlers: GatewayRequestHandlers = { try { job = await context.cron.add(jobCreate); } catch (err) { - if (!(err instanceof TypeError) && !(err instanceof RangeError)) { + if ( + !(err instanceof TypeError) && + !(err instanceof RangeError) && + !isCronInvalidRequestError(err) + ) { throw err; } respond( @@ -454,6 +473,11 @@ export const cronHandlers: GatewayRequestHandlers = { } const patch = p.patch as unknown as CronJobPatch; const cfg = context.getRuntimeConfig(); + const currentJob = await context.cron.readJob(jobId); + if (!currentJob) { + respondInvalidCronParams(respond, "cron.update", "id not found"); + return; + } if (patch.schedule) { const timestampValidation = validateScheduleTimestamp(patch.schedule); if (!timestampValidation.ok) { @@ -466,10 +490,10 @@ export const cronHandlers: GatewayRequestHandlers = { } } try { - assertValidCronUpdateDelivery({ + assertValidCronUpdatePatch({ cfg, defaultAgentId: context.cron.getDefaultAgentId(), - currentJob: context.cron.getJob(jobId), + currentJob, patch, }); } catch (err) { @@ -487,7 +511,11 @@ export const cronHandlers: GatewayRequestHandlers = { try { job = await context.cron.update(jobId, patch); } catch (err) { - if (!(err instanceof TypeError) && !(err instanceof RangeError)) { + if ( + !(err instanceof TypeError) && + !(err instanceof RangeError) && + !isCronInvalidRequestError(err) + ) { throw err; } respond( @@ -552,6 +580,10 @@ export const cronHandlers: GatewayRequestHandlers = { respond(true, { ok: true, ran: false, reason: "invalid-spec" }, undefined); return; } + if (isCronInvalidRequestError(error)) { + respondInvalidCronParams(respond, "cron.run", formatErrorMessage(error)); + return; + } throw error; } respond(true, result, undefined); diff --git a/src/gateway/server-methods/cron.validation.test.ts b/src/gateway/server-methods/cron.validation.test.ts index 3cf521e2027..462b5943834 100644 --- a/src/gateway/server-methods/cron.validation.test.ts +++ b/src/gateway/server-methods/cron.validation.test.ts @@ -76,6 +76,7 @@ function createCronContext(currentJob?: CronJob) { add: vi.fn(async () => ({ id: "cron-1" })), update: vi.fn(async () => ({ id: "cron-1" })), remove: vi.fn(async () => ({ ok: true, removed: true })), + enqueueRun: vi.fn(async () => ({ ok: true, enqueued: true, runId: "run-1" })), getDefaultAgentId: vi.fn(() => "main"), getJob: vi.fn(() => currentJob), wake: vi.fn(() => ({ ok: true }) as const), @@ -116,7 +117,7 @@ async function invokeCronGet(params: Record, currentJob?: CronJ return await invokeCron("cron.get", params, { currentJob }); } -async function invokeCronUpdate(params: Record, currentJob: CronJob) { +async function invokeCronUpdate(params: Record, currentJob?: CronJob) { return await invokeCron("cron.update", params, { currentJob }); } @@ -562,6 +563,51 @@ describe("cron method validation", () => { expectResponseError(respond, { messageIncludes: "delivery.channel is required" }); }); + it("loads the cron job before validating update delivery patches", async () => { + getRuntimeConfig.mockReturnValue({ + session: { + mainKey: "main", + }, + channels: { + telegram: { + botToken: "telegram-token", + }, + slack: { + botToken: "xoxb-slack-token", + appToken: "xapp-slack-token", + }, + }, + plugins: { + entries: { + telegram: { enabled: true }, + slack: { enabled: true }, + }, + }, + } as OpenClawConfig); + + const context = createCronContext(createCronJob()); + context.cron.getJob.mockReturnValue(undefined); + const respond = vi.fn(); + await cronHandlers["cron.update"]({ + req: {} as never, + params: { + id: "cron-1", + patch: { + delivery: { mode: "announce" }, + }, + } as never, + respond: respond as never, + context: context as never, + client: null, + isWebchatConnect: () => false, + }); + + expect(context.cron.readJob).toHaveBeenCalledWith("cron-1"); + expect(context.cron.getJob).not.toHaveBeenCalled(); + expect(context.cron.update).not.toHaveBeenCalled(); + expectResponseError(respond, { messageIncludes: "delivery.channel is required" }); + }); + it("rejects target ids mistakenly supplied as delivery.channel providers", async () => { setRuntimeConfig({ session: { @@ -610,6 +656,37 @@ describe("cron method validation", () => { expectResponseError(respond, { code: "INVALID_REQUEST", messageIncludes: "CronPattern" }); }); + it("returns INVALID_REQUEST when cron.add rejects an incompatible main agent", async () => { + const context = createCronContext(); + context.cron.add.mockRejectedValueOnce( + new Error( + 'cron: sessionTarget "main" is only valid for the default agent. Use sessionTarget "isolated" with payload.kind "agentTurn" for non-default agents (agentId: worker)', + ), + ); + const respond = vi.fn(); + await cronHandlers["cron.add"]({ + req: {} as never, + params: { + name: "bad-main-agent", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "ping" }, + agentId: "worker", + } as never, + respond: respond as never, + context: context as never, + client: null, + isWebchatConnect: () => false, + }); + + expectResponseError(respond, { + code: "INVALID_REQUEST", + messageIncludes: 'sessionTarget "main" is only valid', + }); + }); + it("returns INVALID_REQUEST when cron.update throws a croner parse error (#74066)", async () => { const existingJob = createCronJob(); const context = createCronContext(existingJob); @@ -630,6 +707,59 @@ describe("cron method validation", () => { expectResponseError(respond, { code: "INVALID_REQUEST", messageIncludes: "CronPattern" }); }); + it("returns INVALID_REQUEST when cron.update cannot find the job", async () => { + const { context, respond } = await invokeCronUpdate({ + id: "missing", + patch: { enabled: false }, + }); + + expect(context.cron.update).not.toHaveBeenCalled(); + expectResponseError(respond, { + code: "INVALID_REQUEST", + messageIncludes: "invalid cron.update params: id not found", + }); + }); + + it("rejects cron.update payload/session mismatches before calling the service update", async () => { + const { context, respond } = await invokeCronUpdate( + { + id: "cron-1", + patch: { + payload: { kind: "systemEvent", text: "wake main" }, + }, + }, + createCronJob({ + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "hello" }, + }), + ); + + expect(context.cron.update).not.toHaveBeenCalled(); + expectResponseError(respond, { + code: "INVALID_REQUEST", + messageIncludes: 'isolated/current/session cron jobs require payload.kind="agentTurn"', + }); + }); + + it("returns INVALID_REQUEST when cron.run cannot find the job", async () => { + const context = createCronContext(); + context.cron.enqueueRun.mockRejectedValueOnce(new Error("unknown cron job id: missing")); + const respond = vi.fn(); + await cronHandlers["cron.run"]({ + req: {} as never, + params: { id: "missing" } as never, + respond: respond as never, + context: context as never, + client: null, + isWebchatConnect: () => false, + }); + + expectResponseError(respond, { + code: "INVALID_REQUEST", + messageIncludes: "unknown cron job id: missing", + }); + }); + it("re-throws non-parse errors from cron.add instead of masking as INVALID_REQUEST", async () => { const context = createCronContext(); context.cron.add.mockRejectedValueOnce(new Error("DB write failed"));