diff --git a/CHANGELOG.md b/CHANGELOG.md index d543501ae4f..4e03fd614de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ Docs: https://docs.openclaw.ai - Google video generation: download direct MLDev Veo `video.uri` results instead of passing them through the Files API path, fixing 404s after successful generation/polling. Fixes #71200. Thanks @panhaishan. - Google video generation: fall back to the REST `predictLongRunning` Veo endpoint for text-only SDK 404s while keeping reference image/video generation on the SDK path. Fixes #62309 and #63008. (#62343) Thanks @leoleedev. - MiniMax music generation: switch the bundled default model from the unsupported `music-2.5+` id to the current `music-2.6` API model. Fixes #64870 and addresses the music default from #62315. Thanks @noahclanman and @edwardzheng1. +- Cron: record jobs interrupted by a gateway restart as failed at their original `runningAtMs`, skip unsafe startup replay, and disable interrupted one-shot jobs so they show a visible failure instead of silently disappearing or duplicating work. Fixes #59056, #61343, #63657, and #59301. Thanks @ponchoooPenguin, @daemic24, @myradon, and @hikiwibot. +- Cron tool: recover flat top-level schedule shorthand such as `cron`, `tz`, and `staggerMs` before gateway validation, so model-generated cron add/update calls preserve cron jitter settings. Thanks @tyxben. - Cron: hydrate flat legacy job rows with top-level `cron`, `tz`, `session`, and `message` fields into canonical schedule, target, and payload objects before startup recomputes run times. Fixes #43351. - Agents/replies: let pending group chat history trigger bare mentioned turns without treating metadata-only inbound context as user input. Fixes #71489. (#71520) Thanks @SymbolStar. - Google media generation: strip a configured trailing `/v1beta` from Google music/video provider base URLs before calling the Google GenAI SDK, preventing doubled `/v1beta/v1beta` paths. Fixes #63240. (#63258) Thanks @Hybirdss. diff --git a/src/agents/tools/cron-tool.flat-params.test.ts b/src/agents/tools/cron-tool.flat-params.test.ts index 8d2688ffcfa..c93a68a7bd4 100644 --- a/src/agents/tools/cron-tool.flat-params.test.ts +++ b/src/agents/tools/cron-tool.flat-params.test.ts @@ -36,4 +36,66 @@ describe("cron tool flat-params", () => { expect(method).toBe("cron.add"); expect(params.sessionKey).toBe("agent:main:telegram:group:-100123:topic:99"); }); + + it("recovers flat cron schedule shorthand for add", async () => { + const tool = createCronTool(undefined, { callGatewayTool: callGatewayToolMock }); + + await tool.execute("call-flat-cron-add", { + action: "add", + name: "hourly report", + cron: "0 * * * *", + tz: "UTC", + staggerMs: 5000, + message: "send report", + }); + + const [method, _gatewayOpts, params] = callGatewayToolMock.mock.calls[0] as [ + string, + unknown, + { + schedule?: unknown; + payload?: unknown; + }, + ]; + expect(method).toBe("cron.add"); + expect(params.schedule).toEqual({ + kind: "cron", + expr: "0 * * * *", + tz: "UTC", + staggerMs: 5000, + }); + expect(params.payload).toEqual({ + kind: "agentTurn", + message: "send report", + }); + }); + + it("recovers flat cron schedule shorthand for update", async () => { + const tool = createCronTool(undefined, { callGatewayTool: callGatewayToolMock }); + + await tool.execute("call-flat-cron-update", { + action: "update", + jobId: "job-123", + cron: "15 8 * * 1-5", + tz: "America/Los_Angeles", + staggerMs: 30_000, + }); + + const [method, _gatewayOpts, params] = callGatewayToolMock.mock.calls[0] as [ + string, + unknown, + { + id?: string; + patch?: { schedule?: unknown }; + }, + ]; + expect(method).toBe("cron.update"); + expect(params.id).toBe("job-123"); + expect(params.patch?.schedule).toEqual({ + kind: "cron", + expr: "15 8 * * 1-5", + tz: "America/Los_Angeles", + staggerMs: 30_000, + }); + }); }); diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index c8d3a6288ce..00ed4f4d754 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -39,6 +39,20 @@ const CRON_FLAT_PAYLOAD_KEYS = [ "lightContext", "allowUnsafeExternalContent", ] as const; +const CRON_FLAT_SCHEDULE_KEYS = [ + "kind", + "at", + "atMs", + "every", + "everyMs", + "anchorMs", + "cron", + "expr", + "tz", + "stagger", + "staggerMs", + "exact", +] as const; const CRON_RECOVERABLE_OBJECT_KEYS: ReadonlySet = new Set([ "name", "schedule", @@ -53,6 +67,7 @@ const CRON_RECOVERABLE_OBJECT_KEYS: ReadonlySet = new Set([ "sessionKey", "failureAlert", ...CRON_FLAT_PAYLOAD_KEYS, + ...CRON_FLAT_SCHEDULE_KEYS, ]); const REMINDER_CONTEXT_MESSAGES_MAX = 10; @@ -76,12 +91,29 @@ function recoverCronObjectFromFlatParams(params: Record): { found = true; } } + if (value.everyMs === undefined && value.every !== undefined) { + value.everyMs = value.every; + } + if (value.staggerMs === undefined && value.stagger !== undefined) { + value.staggerMs = value.stagger; + } + if (value.exact === true && value.staggerMs === undefined) { + value.staggerMs = 0; + } + delete value.every; + delete value.stagger; + delete value.exact; return { found, value }; } function hasCronCreateSignal(value: Record): boolean { return ( value.schedule !== undefined || + value.at !== undefined || + value.atMs !== undefined || + value.everyMs !== undefined || + value.cron !== undefined || + value.expr !== undefined || value.payload !== undefined || value.message !== undefined || value.text !== undefined diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts index 6876af32c6a..922ad2828e4 100644 --- a/src/cron/service.restart-catchup.test.ts +++ b/src/cron/service.restart-catchup.test.ts @@ -3,6 +3,7 @@ import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import { CronService } from "./service.js"; import { setupCronServiceSuite } from "./service.test-harness.js"; +import type { CronEvent } from "./service/state.js"; import { createCronServiceState } from "./service/state.js"; import { runMissedJobs } from "./service/timer.js"; @@ -21,6 +22,7 @@ describe("CronService restart catch-up", () => { storePath: string; enqueueSystemEvent: ReturnType; requestHeartbeatNow: ReturnType; + onEvent?: ReturnType; }) { return new CronService({ storePath: params.storePath, @@ -29,6 +31,7 @@ describe("CronService restart catch-up", () => { enqueueSystemEvent: params.enqueueSystemEvent as never, requestHeartbeatNow: params.requestHeartbeatNow as never, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })) as never, + onEvent: params.onEvent as ((evt: CronEvent) => void) | undefined, }); } @@ -53,11 +56,13 @@ describe("CronService restart catch-up", () => { cron: CronService; enqueueSystemEvent: ReturnType; requestHeartbeatNow: ReturnType; + onEvent: ReturnType; }) => Promise, ) { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); + const onEvent = vi.fn(); await writeStoreJobs(store.storePath, jobs); @@ -65,11 +70,12 @@ describe("CronService restart catch-up", () => { storePath: store.storePath, enqueueSystemEvent, requestHeartbeatNow, + onEvent, }); try { await cron.start(); - await run({ cron, enqueueSystemEvent, requestHeartbeatNow }); + await run({ cron, enqueueSystemEvent, requestHeartbeatNow, onEvent }); } finally { cron.stop(); await store.cleanup(); @@ -115,7 +121,7 @@ describe("CronService restart catch-up", () => { ); }); - it("replays interrupted recurring job on first restart (#60495)", async () => { + it("marks interrupted recurring jobs failed instead of replaying them on startup", async () => { const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); @@ -137,23 +143,32 @@ describe("CronService restart catch-up", () => { }, }, ], - async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => { + async ({ cron, enqueueSystemEvent, requestHeartbeatNow, onEvent }) => { expect(noopLogger.warn).toHaveBeenCalledWith( expect.objectContaining({ jobId: "restart-stale-running" }), - "cron: clearing stale running marker on startup", + "cron: marking interrupted running job failed on startup", ); - expect(enqueueSystemEvent).toHaveBeenCalledWith( - "resume stale marker", - expect.objectContaining({ agentId: undefined }), - ); - expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); const listedJobs = await cron.list({ includeDisabled: true }); const updated = listedJobs.find((job) => job.id === "restart-stale-running"); expect(updated?.state.runningAtMs).toBeUndefined(); - expect(updated?.state.lastStatus).toBe("ok"); - expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z")); + expect(updated?.state.lastStatus).toBe("error"); + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastRunAtMs).toBe(staleRunningAt); + expect(updated?.state.lastError).toBe("cron: job interrupted by gateway restart"); + expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z")); + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ + action: "finished", + jobId: "restart-stale-running", + status: "error", + error: "cron: job interrupted by gateway restart", + runAtMs: staleRunningAt, + }), + ); }, ); }); @@ -194,7 +209,7 @@ describe("CronService restart catch-up", () => { ); }); - it("does not replay interrupted one-shot jobs on startup", async () => { + it("marks interrupted one-shot jobs failed and disabled on startup", async () => { const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); @@ -216,13 +231,28 @@ describe("CronService restart catch-up", () => { }, }, ], - async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => { + async ({ cron, enqueueSystemEvent, requestHeartbeatNow, onEvent }) => { expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); const listedJobs = await cron.list({ includeDisabled: true }); const updated = listedJobs.find((job) => job.id === "restart-stale-one-shot"); + expect(updated?.enabled).toBe(false); expect(updated?.state.runningAtMs).toBeUndefined(); + expect(updated?.state.lastStatus).toBe("error"); + expect(updated?.state.lastRunStatus).toBe("error"); + expect(updated?.state.lastRunAtMs).toBe(staleRunningAt); + expect(updated?.state.nextRunAtMs).toBeUndefined(); + expect(updated?.state.lastError).toBe("cron: job interrupted by gateway restart"); + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ + action: "finished", + jobId: "restart-stale-one-shot", + status: "error", + error: "cron: job interrupted by gateway restart", + runAtMs: staleRunningAt, + }), + ); }, ); }); diff --git a/src/cron/service/ops.test.ts b/src/cron/service/ops.test.ts index 87e87824900..508419d1631 100644 --- a/src/cron/service/ops.test.ts +++ b/src/cron/service/ops.test.ts @@ -128,7 +128,7 @@ function createMissedIsolatedJob(now: number): CronJob { } describe("cron service ops seam coverage", () => { - it("start clears stale running markers, replays interrupted recurring jobs, persists, and arms the timer (#60495)", async () => { + it("start marks interrupted running jobs failed, persists, and arms the timer", async () => { const { storePath } = await makeStorePath(); const now = Date.parse("2026-03-23T12:00:00.000Z"); const enqueueSystemEvent = vi.fn(); @@ -154,11 +154,10 @@ describe("cron service ops seam coverage", () => { expect(logger.warn).toHaveBeenCalledWith( expect.objectContaining({ jobId: "startup-interrupted" }), - "cron: clearing stale running marker on startup", + "cron: marking interrupted running job failed on startup", ); - // Interrupted recurring jobs are now replayed on first restart (#60495) - expect(enqueueSystemEvent).toHaveBeenCalled(); - expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); expect(state.timer).not.toBeNull(); const persisted = (await loadCronStore(storePath)) as { @@ -167,7 +166,10 @@ describe("cron service ops seam coverage", () => { const job = persisted.jobs[0]; expect(job).toBeDefined(); expect(job?.state.runningAtMs).toBeUndefined(); - expect(job?.state.lastStatus).toBe("ok"); + expect(job?.state.lastStatus).toBe("error"); + expect(job?.state.lastRunStatus).toBe("error"); + expect(job?.state.lastRunAtMs).toBe(now - 30 * 60_000); + expect(job?.state.lastError).toBe("cron: job interrupted by gateway restart"); expect((job?.state.nextRunAtMs ?? 0) > now).toBe(true); const delays = timeoutSpy.mock.calls diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 2c61998c02c..86422a519da 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -42,6 +42,55 @@ import { wake, } from "./timer.js"; +const STARTUP_INTERRUPTED_ERROR = "cron: job interrupted by gateway restart"; + +type InterruptedStartupRun = { + jobId: string; + runAtMs: number; + durationMs: number; +}; + +function markInterruptedStartupRun(params: { + state: CronServiceState; + job: CronJob; + runningAtMs: number; + nowMs: number; +}): InterruptedStartupRun { + const { job, runningAtMs, nowMs } = params; + const previousErrors = + typeof job.state.consecutiveErrors === "number" && Number.isFinite(job.state.consecutiveErrors) + ? Math.max(0, Math.floor(job.state.consecutiveErrors)) + : 0; + + params.state.deps.log.warn( + { jobId: job.id, runningAtMs }, + "cron: marking interrupted running job failed on startup", + ); + + job.state.runningAtMs = undefined; + job.state.lastRunAtMs = runningAtMs; + job.state.lastRunStatus = "error"; + job.state.lastStatus = "error"; + job.state.lastError = STARTUP_INTERRUPTED_ERROR; + job.state.lastDurationMs = Math.max(0, nowMs - runningAtMs); + job.state.consecutiveErrors = previousErrors + 1; + job.state.lastDelivered = false; + job.state.lastDeliveryStatus = "unknown"; + job.state.lastDeliveryError = STARTUP_INTERRUPTED_ERROR; + job.state.nextRunAtMs = undefined; + job.updatedAtMs = nowMs; + + if (job.schedule.kind === "at") { + job.enabled = false; + } + + return { + jobId: job.id, + runAtMs: runningAtMs, + durationMs: job.state.lastDurationMs, + }; +} + function mergeManualRunSnapshotAfterReload(params: { state: CronServiceState; jobId: string; @@ -90,35 +139,34 @@ export async function start(state: CronServiceState) { return; } - const interruptedOneShotIds = new Set(); - let clearedAnyRunningMarker = false; + const interruptedJobIds = new Set(); + const interruptedRuns: InterruptedStartupRun[] = []; + let markedAnyInterruptedRun = false; await locked(state, async () => { await ensureLoaded(state, { skipRecompute: true }); const jobs = state.store?.jobs ?? []; for (const job of jobs) { job.state ??= {}; if (typeof job.state.runningAtMs === "number") { - state.deps.log.warn( - { jobId: job.id, runningAtMs: job.state.runningAtMs }, - "cron: clearing stale running marker on startup", - ); - job.state.runningAtMs = undefined; - clearedAnyRunningMarker = true; - // One-shot jobs are not retried after interruption; recurring jobs - // (cron/every) are eligible for startup catch-up so they don't - // require a second restart to recover (#60495). - if (job.schedule.kind === "at") { - interruptedOneShotIds.add(job.id); - } + const nowMs = state.deps.nowMs(); + const interrupted = markInterruptedStartupRun({ + state, + job, + runningAtMs: job.state.runningAtMs, + nowMs, + }); + interruptedJobIds.add(job.id); + interruptedRuns.push(interrupted); + markedAnyInterruptedRun = true; } } - if (clearedAnyRunningMarker) { + if (markedAnyInterruptedRun) { await persist(state); } }); await runMissedJobs(state, { - skipJobIds: interruptedOneShotIds.size > 0 ? interruptedOneShotIds : undefined, + skipJobIds: interruptedJobIds.size > 0 ? interruptedJobIds : undefined, }); await locked(state, async () => { @@ -130,6 +178,21 @@ export async function start(state: CronServiceState) { if (changed) { await persist(state); } + for (const interrupted of interruptedRuns) { + const job = state.store?.jobs.find((entry) => entry.id === interrupted.jobId); + emit(state, { + jobId: interrupted.jobId, + action: "finished", + status: "error", + error: STARTUP_INTERRUPTED_ERROR, + delivered: false, + deliveryStatus: "unknown", + deliveryError: STARTUP_INTERRUPTED_ERROR, + runAtMs: interrupted.runAtMs, + durationMs: interrupted.durationMs, + nextRunAtMs: job?.state.nextRunAtMs, + }); + } armTimer(state); state.deps.log.info( {