fix(cron): keep manual runs non-blocking

This commit is contained in:
Peter Steinberger
2026-02-22 19:47:41 +01:00
parent 91f75a2b33
commit 5db1ee4ec6
4 changed files with 139 additions and 7 deletions

View File

@@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai
- Dev tooling: prevent `CLAUDE.md` symlink target regressions by excluding CLAUDE symlink sentinels from `oxfmt` and marking them `-text` in `.gitattributes`, so formatter/EOL normalization cannot reintroduce trailing-newline targets. Thanks @vincentkoc.
- Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman.
- Cron/Isolation: force fresh session IDs for isolated cron runs so `sessionTarget="isolated"` executions never reuse prior run context. (#23470) Thanks @echoVic.
- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves.
- Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg.
- Feishu/Media: for inbound video messages that include both `file_key` (video) and `image_key` (thumbnail), prefer `file_key` when downloading media so video attachments are saved instead of silently failing on thumbnail keys. (#23633)
- Hooks/Cron: suppress duplicate main-session events for delivered hook turns and mark `SILENT_REPLY_TOKEN` (`NO_REPLY`) early exits as delivered to prevent hook context pollution. (#20678) Thanks @JonathanWorks.

View File

@@ -162,6 +162,59 @@ describe("CronService read ops while job is running", () => {
}
});
it("keeps list and status responsive during manual cron.run execution", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const isolatedRun = createDeferredIsolatedRun();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: isolatedRun.runIsolatedAgentJob,
});
try {
await cron.start();
const job = await cron.add({
name: "manual run isolation",
enabled: true,
deleteAfterRun: false,
schedule: {
kind: "at",
at: new Date("2030-01-01T00:00:00.000Z").toISOString(),
},
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "manual run" },
delivery: { mode: "none" },
});
const runPromise = cron.run(job.id, "force");
await isolatedRun.runStarted;
await expect(
withTimeout(cron.list({ includeDisabled: true }), 300, "cron.list during cron.run"),
).resolves.toBeTypeOf("object");
await expect(withTimeout(cron.status(), 300, "cron.status during cron.run")).resolves.toEqual(
expect.objectContaining({ enabled: true, storePath: store.storePath }),
);
isolatedRun.completeRun({ status: "ok", summary: "manual done" });
await expect(runPromise).resolves.toEqual({ ok: true, ran: true });
const completed = await cron.list({ includeDisabled: true });
expect(completed[0]?.state.lastStatus).toBe("ok");
expect(completed[0]?.state.runningAtMs).toBeUndefined();
} finally {
cron.stop();
await store.cleanup();
}
});
it("keeps list and status responsive during startup catch-up runs", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();

View File

@@ -12,7 +12,15 @@ import {
import { locked } from "./locked.js";
import type { CronServiceState } from "./state.js";
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
import { armTimer, emit, executeJob, runMissedJobs, stopTimer, wake } from "./timer.js";
import {
applyJobResult,
armTimer,
emit,
executeJobCore,
runMissedJobs,
stopTimer,
wake,
} from "./timer.js";
async function ensureLoadedForRead(state: CronServiceState) {
await ensureLoaded(state, { skipRecompute: true });
@@ -201,7 +209,7 @@ export async function remove(state: CronServiceState, id: string) {
}
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
return await locked(state, async () => {
const prepared = await locked(state, async () => {
warnIfDisabled(state, "run");
await ensureLoaded(state, { skipRecompute: true });
const job = findJobOrThrow(state, id);
@@ -213,12 +221,82 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
if (!due) {
return { ok: true, ran: false, reason: "not-due" as const };
}
await executeJob(state, job, now, { forced: mode === "force" });
// Reserve this run under lock, then execute outside lock so read ops
// (`list`, `status`) stay responsive while the run is in progress.
job.state.runningAtMs = now;
job.state.lastError = undefined;
emit(state, { jobId: job.id, action: "started", runAtMs: now });
const executionJob = JSON.parse(JSON.stringify(job)) as typeof job;
return {
ok: true,
ran: true,
jobId: job.id,
startedAt: now,
executionJob,
} as const;
});
if (!prepared.ran) {
return prepared;
}
let coreResult:
| Awaited<ReturnType<typeof executeJobCore>>
| {
status: "error";
error: string;
};
try {
coreResult = await executeJobCore(state, prepared.executionJob);
} catch (err) {
coreResult = { status: "error", error: String(err) };
}
const endedAt = state.deps.nowMs();
await locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });
const job = state.store?.jobs.find((entry) => entry.id === prepared.jobId);
if (!job) {
return;
}
const shouldDelete = applyJobResult(state, job, {
status: coreResult.status,
error: coreResult.error,
delivered: coreResult.delivered,
startedAt: prepared.startedAt,
endedAt,
});
emit(state, {
jobId: job.id,
action: "finished",
status: coreResult.status,
error: coreResult.error,
summary: coreResult.summary,
delivered: coreResult.delivered,
sessionId: coreResult.sessionId,
sessionKey: coreResult.sessionKey,
runAtMs: prepared.startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
model: coreResult.model,
provider: coreResult.provider,
usage: coreResult.usage,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
emit(state, { jobId: job.id, action: "removed" });
}
recomputeNextRuns(state);
await persist(state);
armTimer(state);
return { ok: true, ran: true } as const;
});
return { ok: true, ran: true } as const;
}
export function wakeNow(

View File

@@ -29,7 +29,7 @@ const MIN_REFIRE_GAP_MS = 2_000;
* on top of the per-provider / per-agent timeouts to prevent one stuck job
* from wedging the entire cron lane.
*/
const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
export const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
type TimedCronRunOutcome = CronRunOutcome &
CronRunTelemetry & {
@@ -68,7 +68,7 @@ function errorBackoffMs(consecutiveErrors: number): number {
* Handles consecutive error tracking, exponential backoff, one-shot disable,
* and nextRunAtMs computation. Returns `true` if the job should be deleted.
*/
function applyJobResult(
export function applyJobResult(
state: CronServiceState,
job: CronJob,
result: {
@@ -556,7 +556,7 @@ export async function runDueJobs(state: CronServiceState) {
}
}
async function executeJobCore(
export async function executeJobCore(
state: CronServiceState,
job: CronJob,
abortSignal?: AbortSignal,