fix(cron): record interrupted startup runs

* fix(cron): record interrupted startup runs

* test(cron): update interrupted startup expectations
This commit is contained in:
Vincent Koc
2026-04-25 04:28:11 -07:00
committed by GitHub
parent fc5920fb51
commit 924271385b
6 changed files with 226 additions and 35 deletions

View File

@@ -27,6 +27,8 @@ Docs: https://docs.openclaw.ai
- Google video generation: download direct MLDev Veo `video.uri` results instead of passing them through the Files API path, fixing 404s after successful generation/polling. Fixes #71200. Thanks @panhaishan.
- Google video generation: fall back to the REST `predictLongRunning` Veo endpoint for text-only SDK 404s while keeping reference image/video generation on the SDK path. Fixes #62309 and #63008. (#62343) Thanks @leoleedev.
- MiniMax music generation: switch the bundled default model from the unsupported `music-2.5+` id to the current `music-2.6` API model. Fixes #64870 and addresses the music default from #62315. Thanks @noahclanman and @edwardzheng1.
- Cron: record jobs interrupted by a gateway restart as failed at their original `runningAtMs`, skip unsafe startup replay, and disable interrupted one-shot jobs so they show a visible failure instead of silently disappearing or duplicating work. Fixes #59056, #61343, #63657, and #59301. Thanks @ponchoooPenguin, @daemic24, @myradon, and @hikiwibot.
- Cron tool: recover flat top-level schedule shorthand such as `cron`, `tz`, and `staggerMs` before gateway validation, so model-generated cron add/update calls preserve cron jitter settings. Thanks @tyxben.
- Cron: hydrate flat legacy job rows with top-level `cron`, `tz`, `session`, and `message` fields into canonical schedule, target, and payload objects before startup recomputes run times. Fixes #43351.
- Agents/replies: let pending group chat history trigger bare mentioned turns without treating metadata-only inbound context as user input. Fixes #71489. (#71520) Thanks @SymbolStar.
- Google media generation: strip a configured trailing `/v1beta` from Google music/video provider base URLs before calling the Google GenAI SDK, preventing doubled `/v1beta/v1beta` paths. Fixes #63240. (#63258) Thanks @Hybirdss.

View File

@@ -36,4 +36,66 @@ describe("cron tool flat-params", () => {
expect(method).toBe("cron.add");
expect(params.sessionKey).toBe("agent:main:telegram:group:-100123:topic:99");
});
it("recovers flat cron schedule shorthand for add", async () => {
const tool = createCronTool(undefined, { callGatewayTool: callGatewayToolMock });
await tool.execute("call-flat-cron-add", {
action: "add",
name: "hourly report",
cron: "0 * * * *",
tz: "UTC",
staggerMs: 5000,
message: "send report",
});
const [method, _gatewayOpts, params] = callGatewayToolMock.mock.calls[0] as [
string,
unknown,
{
schedule?: unknown;
payload?: unknown;
},
];
expect(method).toBe("cron.add");
expect(params.schedule).toEqual({
kind: "cron",
expr: "0 * * * *",
tz: "UTC",
staggerMs: 5000,
});
expect(params.payload).toEqual({
kind: "agentTurn",
message: "send report",
});
});
it("recovers flat cron schedule shorthand for update", async () => {
const tool = createCronTool(undefined, { callGatewayTool: callGatewayToolMock });
await tool.execute("call-flat-cron-update", {
action: "update",
jobId: "job-123",
cron: "15 8 * * 1-5",
tz: "America/Los_Angeles",
staggerMs: 30_000,
});
const [method, _gatewayOpts, params] = callGatewayToolMock.mock.calls[0] as [
string,
unknown,
{
id?: string;
patch?: { schedule?: unknown };
},
];
expect(method).toBe("cron.update");
expect(params.id).toBe("job-123");
expect(params.patch?.schedule).toEqual({
kind: "cron",
expr: "15 8 * * 1-5",
tz: "America/Los_Angeles",
staggerMs: 30_000,
});
});
});

View File

@@ -39,6 +39,20 @@ const CRON_FLAT_PAYLOAD_KEYS = [
"lightContext",
"allowUnsafeExternalContent",
] as const;
const CRON_FLAT_SCHEDULE_KEYS = [
"kind",
"at",
"atMs",
"every",
"everyMs",
"anchorMs",
"cron",
"expr",
"tz",
"stagger",
"staggerMs",
"exact",
] as const;
const CRON_RECOVERABLE_OBJECT_KEYS: ReadonlySet<string> = new Set([
"name",
"schedule",
@@ -53,6 +67,7 @@ const CRON_RECOVERABLE_OBJECT_KEYS: ReadonlySet<string> = new Set([
"sessionKey",
"failureAlert",
...CRON_FLAT_PAYLOAD_KEYS,
...CRON_FLAT_SCHEDULE_KEYS,
]);
const REMINDER_CONTEXT_MESSAGES_MAX = 10;
@@ -76,12 +91,29 @@ function recoverCronObjectFromFlatParams(params: Record<string, unknown>): {
found = true;
}
}
if (value.everyMs === undefined && value.every !== undefined) {
value.everyMs = value.every;
}
if (value.staggerMs === undefined && value.stagger !== undefined) {
value.staggerMs = value.stagger;
}
if (value.exact === true && value.staggerMs === undefined) {
value.staggerMs = 0;
}
delete value.every;
delete value.stagger;
delete value.exact;
return { found, value };
}
function hasCronCreateSignal(value: Record<string, unknown>): boolean {
return (
value.schedule !== undefined ||
value.at !== undefined ||
value.atMs !== undefined ||
value.everyMs !== undefined ||
value.cron !== undefined ||
value.expr !== undefined ||
value.payload !== undefined ||
value.message !== undefined ||
value.text !== undefined

View File

@@ -3,6 +3,7 @@ import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
import { setupCronServiceSuite } from "./service.test-harness.js";
import type { CronEvent } from "./service/state.js";
import { createCronServiceState } from "./service/state.js";
import { runMissedJobs } from "./service/timer.js";
@@ -21,6 +22,7 @@ describe("CronService restart catch-up", () => {
storePath: string;
enqueueSystemEvent: ReturnType<typeof vi.fn>;
requestHeartbeatNow: ReturnType<typeof vi.fn>;
onEvent?: ReturnType<typeof vi.fn>;
}) {
return new CronService({
storePath: params.storePath,
@@ -29,6 +31,7 @@ describe("CronService restart catch-up", () => {
enqueueSystemEvent: params.enqueueSystemEvent as never,
requestHeartbeatNow: params.requestHeartbeatNow as never,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })) as never,
onEvent: params.onEvent as ((evt: CronEvent) => void) | undefined,
});
}
@@ -53,11 +56,13 @@ describe("CronService restart catch-up", () => {
cron: CronService;
enqueueSystemEvent: ReturnType<typeof vi.fn>;
requestHeartbeatNow: ReturnType<typeof vi.fn>;
onEvent: ReturnType<typeof vi.fn>;
}) => Promise<void>,
) {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const onEvent = vi.fn();
await writeStoreJobs(store.storePath, jobs);
@@ -65,11 +70,12 @@ describe("CronService restart catch-up", () => {
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
onEvent,
});
try {
await cron.start();
await run({ cron, enqueueSystemEvent, requestHeartbeatNow });
await run({ cron, enqueueSystemEvent, requestHeartbeatNow, onEvent });
} finally {
cron.stop();
await store.cleanup();
@@ -115,7 +121,7 @@ describe("CronService restart catch-up", () => {
);
});
it("replays interrupted recurring job on first restart (#60495)", async () => {
it("marks interrupted recurring jobs failed instead of replaying them on startup", async () => {
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
@@ -137,23 +143,32 @@ describe("CronService restart catch-up", () => {
},
},
],
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
async ({ cron, enqueueSystemEvent, requestHeartbeatNow, onEvent }) => {
expect(noopLogger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "restart-stale-running" }),
"cron: clearing stale running marker on startup",
"cron: marking interrupted running job failed on startup",
);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"resume stale marker",
expect.objectContaining({ agentId: undefined }),
);
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-stale-running");
expect(updated?.state.runningAtMs).toBeUndefined();
expect(updated?.state.lastStatus).toBe("ok");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z"));
expect(updated?.state.lastStatus).toBe("error");
expect(updated?.state.lastRunStatus).toBe("error");
expect(updated?.state.lastRunAtMs).toBe(staleRunningAt);
expect(updated?.state.lastError).toBe("cron: job interrupted by gateway restart");
expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z"));
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
action: "finished",
jobId: "restart-stale-running",
status: "error",
error: "cron: job interrupted by gateway restart",
runAtMs: staleRunningAt,
}),
);
},
);
});
@@ -194,7 +209,7 @@ describe("CronService restart catch-up", () => {
);
});
it("does not replay interrupted one-shot jobs on startup", async () => {
it("marks interrupted one-shot jobs failed and disabled on startup", async () => {
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
@@ -216,13 +231,28 @@ describe("CronService restart catch-up", () => {
},
},
],
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
async ({ cron, enqueueSystemEvent, requestHeartbeatNow, onEvent }) => {
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const listedJobs = await cron.list({ includeDisabled: true });
const updated = listedJobs.find((job) => job.id === "restart-stale-one-shot");
expect(updated?.enabled).toBe(false);
expect(updated?.state.runningAtMs).toBeUndefined();
expect(updated?.state.lastStatus).toBe("error");
expect(updated?.state.lastRunStatus).toBe("error");
expect(updated?.state.lastRunAtMs).toBe(staleRunningAt);
expect(updated?.state.nextRunAtMs).toBeUndefined();
expect(updated?.state.lastError).toBe("cron: job interrupted by gateway restart");
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
action: "finished",
jobId: "restart-stale-one-shot",
status: "error",
error: "cron: job interrupted by gateway restart",
runAtMs: staleRunningAt,
}),
);
},
);
});

View File

@@ -128,7 +128,7 @@ function createMissedIsolatedJob(now: number): CronJob {
}
describe("cron service ops seam coverage", () => {
it("start clears stale running markers, replays interrupted recurring jobs, persists, and arms the timer (#60495)", async () => {
it("start marks interrupted running jobs failed, persists, and arms the timer", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-03-23T12:00:00.000Z");
const enqueueSystemEvent = vi.fn();
@@ -154,11 +154,10 @@ describe("cron service ops seam coverage", () => {
expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ jobId: "startup-interrupted" }),
"cron: clearing stale running marker on startup",
"cron: marking interrupted running job failed on startup",
);
// Interrupted recurring jobs are now replayed on first restart (#60495)
expect(enqueueSystemEvent).toHaveBeenCalled();
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expect(state.timer).not.toBeNull();
const persisted = (await loadCronStore(storePath)) as {
@@ -167,7 +166,10 @@ describe("cron service ops seam coverage", () => {
const job = persisted.jobs[0];
expect(job).toBeDefined();
expect(job?.state.runningAtMs).toBeUndefined();
expect(job?.state.lastStatus).toBe("ok");
expect(job?.state.lastStatus).toBe("error");
expect(job?.state.lastRunStatus).toBe("error");
expect(job?.state.lastRunAtMs).toBe(now - 30 * 60_000);
expect(job?.state.lastError).toBe("cron: job interrupted by gateway restart");
expect((job?.state.nextRunAtMs ?? 0) > now).toBe(true);
const delays = timeoutSpy.mock.calls

View File

@@ -42,6 +42,55 @@ import {
wake,
} from "./timer.js";
const STARTUP_INTERRUPTED_ERROR = "cron: job interrupted by gateway restart";
type InterruptedStartupRun = {
jobId: string;
runAtMs: number;
durationMs: number;
};
function markInterruptedStartupRun(params: {
state: CronServiceState;
job: CronJob;
runningAtMs: number;
nowMs: number;
}): InterruptedStartupRun {
const { job, runningAtMs, nowMs } = params;
const previousErrors =
typeof job.state.consecutiveErrors === "number" && Number.isFinite(job.state.consecutiveErrors)
? Math.max(0, Math.floor(job.state.consecutiveErrors))
: 0;
params.state.deps.log.warn(
{ jobId: job.id, runningAtMs },
"cron: marking interrupted running job failed on startup",
);
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = runningAtMs;
job.state.lastRunStatus = "error";
job.state.lastStatus = "error";
job.state.lastError = STARTUP_INTERRUPTED_ERROR;
job.state.lastDurationMs = Math.max(0, nowMs - runningAtMs);
job.state.consecutiveErrors = previousErrors + 1;
job.state.lastDelivered = false;
job.state.lastDeliveryStatus = "unknown";
job.state.lastDeliveryError = STARTUP_INTERRUPTED_ERROR;
job.state.nextRunAtMs = undefined;
job.updatedAtMs = nowMs;
if (job.schedule.kind === "at") {
job.enabled = false;
}
return {
jobId: job.id,
runAtMs: runningAtMs,
durationMs: job.state.lastDurationMs,
};
}
function mergeManualRunSnapshotAfterReload(params: {
state: CronServiceState;
jobId: string;
@@ -90,35 +139,34 @@ export async function start(state: CronServiceState) {
return;
}
const interruptedOneShotIds = new Set<string>();
let clearedAnyRunningMarker = false;
const interruptedJobIds = new Set<string>();
const interruptedRuns: InterruptedStartupRun[] = [];
let markedAnyInterruptedRun = false;
await locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });
const jobs = state.store?.jobs ?? [];
for (const job of jobs) {
job.state ??= {};
if (typeof job.state.runningAtMs === "number") {
state.deps.log.warn(
{ jobId: job.id, runningAtMs: job.state.runningAtMs },
"cron: clearing stale running marker on startup",
);
job.state.runningAtMs = undefined;
clearedAnyRunningMarker = true;
// One-shot jobs are not retried after interruption; recurring jobs
// (cron/every) are eligible for startup catch-up so they don't
// require a second restart to recover (#60495).
if (job.schedule.kind === "at") {
interruptedOneShotIds.add(job.id);
}
const nowMs = state.deps.nowMs();
const interrupted = markInterruptedStartupRun({
state,
job,
runningAtMs: job.state.runningAtMs,
nowMs,
});
interruptedJobIds.add(job.id);
interruptedRuns.push(interrupted);
markedAnyInterruptedRun = true;
}
}
if (clearedAnyRunningMarker) {
if (markedAnyInterruptedRun) {
await persist(state);
}
});
await runMissedJobs(state, {
skipJobIds: interruptedOneShotIds.size > 0 ? interruptedOneShotIds : undefined,
skipJobIds: interruptedJobIds.size > 0 ? interruptedJobIds : undefined,
});
await locked(state, async () => {
@@ -130,6 +178,21 @@ export async function start(state: CronServiceState) {
if (changed) {
await persist(state);
}
for (const interrupted of interruptedRuns) {
const job = state.store?.jobs.find((entry) => entry.id === interrupted.jobId);
emit(state, {
jobId: interrupted.jobId,
action: "finished",
status: "error",
error: STARTUP_INTERRUPTED_ERROR,
delivered: false,
deliveryStatus: "unknown",
deliveryError: STARTUP_INTERRUPTED_ERROR,
runAtMs: interrupted.runAtMs,
durationMs: interrupted.durationMs,
nextRunAtMs: job?.state.nextRunAtMs,
});
}
armTimer(state);
state.deps.log.info(
{