fix: avoid cron cancel runtime cycle

This commit is contained in:
Shakker
2026-06-08 20:03:18 +01:00
committed by Shakker
parent 3cf94309d9
commit 372f85d368
9 changed files with 87 additions and 161 deletions

View File

@@ -2,71 +2,33 @@
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
type CronActiveJobState = {
activeJobRuns: Map<string, Map<string, ActiveCronJobRun>>;
activeJobIds: Set<string>;
};
const CRON_ACTIVE_JOB_STATE_KEY = Symbol.for("openclaw.cron.activeJobs");
const DEFAULT_RUN_KEY = "__cron-job__";
type ActiveCronJobRun = {
runId?: string;
abortController?: AbortController;
};
type MarkCronJobActiveOptions = {
runId?: string;
abortController?: AbortController;
};
type CancelCronJobRunResult =
| { found: false; cancelled: false; reason: string }
| { found: true; cancelled: false; reason: string }
| { found: true; cancelled: true };
function getCronActiveJobState(): CronActiveJobState {
// Cron runs can cross module reload boundaries in tests and dev watch; keep
// the in-flight job map process-global so duplicate-run guards share state.
// the in-flight job set process-global so duplicate-run guards share state.
return resolveGlobalSingleton<CronActiveJobState>(CRON_ACTIVE_JOB_STATE_KEY, () => ({
activeJobRuns: new Map<string, Map<string, ActiveCronJobRun>>(),
activeJobIds: new Set<string>(),
}));
}
function normalizeRunKey(runId: string | undefined): string {
return runId?.trim() || DEFAULT_RUN_KEY;
}
/** Marks a cron job id as currently executing for duplicate-run suppression. */
export function markCronJobActive(jobId: string, opts?: MarkCronJobActiveOptions) {
export function markCronJobActive(jobId: string) {
if (!jobId) {
return;
}
const state = getCronActiveJobState();
const runs = state.activeJobRuns.get(jobId) ?? new Map<string, ActiveCronJobRun>();
runs.set(normalizeRunKey(opts?.runId), {
...(opts?.runId ? { runId: opts.runId } : {}),
...(opts?.abortController ? { abortController: opts.abortController } : {}),
});
state.activeJobRuns.set(jobId, runs);
getCronActiveJobState().activeJobIds.add(jobId);
}
/** Clears the active marker when a cron run exits or is abandoned. */
export function clearCronJobActive(jobId: string, runId?: string) {
export function clearCronJobActive(jobId: string) {
if (!jobId) {
return;
}
const state = getCronActiveJobState();
if (runId === undefined) {
state.activeJobRuns.delete(jobId);
return;
}
const runs = state.activeJobRuns.get(jobId);
if (!runs) {
return;
}
runs.delete(normalizeRunKey(runId));
if (runs.size === 0) {
state.activeJobRuns.delete(jobId);
}
getCronActiveJobState().activeJobIds.delete(jobId);
}
/** Returns whether the given cron job id is currently executing in this process. */
@@ -74,73 +36,15 @@ export function isCronJobActive(jobId: string) {
if (!jobId) {
return false;
}
return (getCronActiveJobState().activeJobRuns.get(jobId)?.size ?? 0) > 0;
return getCronActiveJobState().activeJobIds.has(jobId);
}
/** Returns whether any cron run is active in this process. */
export function hasActiveCronJobs() {
return getCronActiveJobState().activeJobRuns.size > 0;
}
/** Aborts an active cron run in the current process when one owns the task row. */
export function cancelCronJobRun(params: {
jobId?: string;
runId?: string;
reason?: string;
}): CancelCronJobRunResult {
const jobId = params.jobId?.trim();
if (!jobId) {
return {
found: false,
cancelled: false,
reason: "Cron task has no cancellable job id.",
};
}
const runs = getCronActiveJobState().activeJobRuns.get(jobId);
if (!runs || runs.size === 0) {
return {
found: false,
cancelled: false,
reason: "Cron task is not active in this gateway process.",
};
}
let run: ActiveCronJobRun | undefined;
if (params.runId) {
run = runs.get(normalizeRunKey(params.runId));
} else {
const first = runs.values().next();
run = first.done ? undefined : first.value;
}
if (!run) {
return {
found: false,
cancelled: false,
reason: "Cron task run is not active in this gateway process.",
};
}
const controller = run.abortController;
if (!controller) {
return {
found: true,
cancelled: false,
reason: "Cron task has no active cancellation handle.",
};
}
if (controller.signal.aborted) {
return {
found: true,
cancelled: false,
reason: "Cron task is already cancelling.",
};
}
controller.abort(params.reason?.trim() || "Cancelled by operator.");
return {
found: true,
cancelled: true,
};
return getCronActiveJobState().activeJobIds.size > 0;
}
/** Clears process-global cron active-job state between tests. */
export function resetCronActiveJobsForTests() {
getCronActiveJobState().activeJobRuns.clear();
getCronActiveJobState().activeJobIds.clear();
}

View File

@@ -11,14 +11,16 @@ import {
setupCronRegressionFixtures,
} from "../../../test/helpers/cron/service-regression-fixtures.js";
import { HEARTBEAT_SKIP_LANES_BUSY, type HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import {
cancelActiveCronTaskRun,
resetActiveCronTaskRunsForTests,
} from "../../tasks/cron-task-cancel.js";
import {
cancelTaskById,
listTaskRecords,
resetTaskRegistryControlRuntimeForTests,
resetTaskRegistryForTests,
setTaskRegistryControlRuntimeForTests,
} from "../../tasks/task-registry.js";
import { cancelCronJobRun } from "../active-jobs.js";
import * as schedule from "../schedule.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type {
@@ -889,13 +891,12 @@ describe("cron service timer regressions", () => {
const timerPromise = onTimer(state);
const observedAbortSignal = await runnerStarted.promise;
const runId = `cron:cancel-before-timeout:${scheduledAt}`;
const cancelled = cancelCronJobRun({
jobId: "cancel-before-timeout",
const cancelled = cancelActiveCronTaskRun({
runId,
reason: "Cancelled by operator.",
});
expect(cancelled).toEqual({ found: true, cancelled: true });
expect(cancelled).toBe(true);
expect(observedAbortSignal?.aborted).toBe(true);
await vi.advanceTimersByTimeAsync(Math.ceil(FAST_TIMEOUT_SECONDS * 1_000) + 10);
@@ -1131,13 +1132,6 @@ describe("cron service timer regressions", () => {
vi.useFakeTimers();
try {
resetTaskRegistryForTests();
setTaskRegistryControlRuntimeForTests({
getAcpSessionManager: () => ({
cancelSession: vi.fn(),
}),
killSubagentRunAdmin: vi.fn(),
cancelCronJobRun,
});
const store = timerRegressionFixtures.makeStorePath();
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
@@ -1227,6 +1221,7 @@ describe("cron service timer regressions", () => {
}),
);
} finally {
resetActiveCronTaskRunsForTests();
resetTaskRegistryControlRuntimeForTests();
resetTaskRegistryForTests();
vi.useRealTimers();

View File

@@ -13,6 +13,7 @@ import {
normalizeAgentId,
resolveAgentIdFromSessionKey,
} from "../../routing/session-key.js";
import { registerActiveCronTaskRun } from "../../tasks/cron-task-cancel.js";
import { deliveryContextFromSession } from "../../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
@@ -124,14 +125,16 @@ export async function executeJobCoreWithTimeout(
opts?: { runId?: string },
): Promise<Awaited<ReturnType<typeof executeJobCore>>> {
const runAbortController = new AbortController();
markCronJobActive(job.id, {
runId: opts?.runId,
// Main-session cron jobs enqueue work into a downstream child session.
// The cron wrapper does not own that queued run, so exposing its abort
// signal would let task cancellation mark the ledger row cancelled while
// the child session can continue running.
...(job.sessionTarget !== "main" ? { abortController: runAbortController } : {}),
});
// Main-session cron jobs enqueue work into a downstream child session. The
// cron wrapper does not own that queued run, so it must not expose a task
// cancellation handle that could make the wrapper row lie about child state.
const releaseCronTaskRun =
job.sessionTarget !== "main"
? registerActiveCronTaskRun({
runId: opts?.runId,
controller: runAbortController,
})
: undefined;
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
try {
if (typeof jobTimeoutMs !== "number") {
@@ -193,7 +196,7 @@ export async function executeJobCoreWithTimeout(
watchdog.dispose();
}
} finally {
clearCronJobActive(job.id, opts?.runId);
releaseCronTaskRun?.();
}
}

View File

@@ -0,0 +1,45 @@
// Process-local cancellation handles for live cron task runs.
type CronTaskCancelHandle = {
controller: AbortController;
};
const activeCronTaskRunsByRunId = new Map<string, CronTaskCancelHandle>();
export function registerActiveCronTaskRun(params: {
runId: string | undefined;
controller: AbortController;
}): (() => void) | undefined {
const runId = params.runId?.trim();
if (!runId) {
return undefined;
}
activeCronTaskRunsByRunId.set(runId, { controller: params.controller });
return () => {
if (activeCronTaskRunsByRunId.get(runId)?.controller === params.controller) {
activeCronTaskRunsByRunId.delete(runId);
}
};
}
export function cancelActiveCronTaskRun(params: {
runId: string | undefined;
reason?: string;
}): boolean {
const runId = params.runId?.trim();
if (!runId) {
return false;
}
const handle = activeCronTaskRunsByRunId.get(runId);
if (!handle) {
return false;
}
if (!handle.controller.signal.aborted) {
handle.controller.abort(params.reason?.trim() || "Cancelled by operator.");
}
return true;
}
export function resetActiveCronTaskRunsForTests(): void {
activeCronTaskRunsByRunId.clear();
}

View File

@@ -117,11 +117,6 @@ async function withTaskExecutorStateDir(run: (stateDir: string) => Promise<void>
cancelSession: hoisted.cancelSessionMock,
}),
killSubagentRunAdmin: async (params) => hoisted.killSubagentRunAdminMock(params),
cancelCronJobRun: () => ({
found: false,
cancelled: false,
reason: "Cron task is not active in this gateway process.",
}),
});
try {
await run(stateDir);

View File

@@ -1,4 +1,3 @@
// Runtime control seam for cancelling ACP sessions and subagent runs from task APIs.
export { getAcpSessionManager } from "../acp/control-plane/manager.js";
export { killSubagentRunAdmin } from "../agents/subagent-control.js";
export { cancelCronJobRun } from "../cron/active-jobs.js";

View File

@@ -22,21 +22,9 @@ export type KillSubagentRunAdmin = (params: {
sessionKey: string;
}) => Promise<KillSubagentRunAdminResult>;
export type CancelCronJobRunResult =
| { found: false; cancelled: false; reason: string }
| { found: true; cancelled: false; reason: string }
| { found: true; cancelled: true };
export type CancelCronJobRun = (params: {
jobId?: string;
runId?: string;
reason?: string;
}) => CancelCronJobRunResult;
export type TaskRegistryControlRuntime = {
getAcpSessionManager: () => {
cancelSession: CancelAcpSessionAdmin;
};
killSubagentRunAdmin: KillSubagentRunAdmin;
cancelCronJobRun: CancelCronJobRun;
};

View File

@@ -2,11 +2,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js";
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js";
import {
cancelCronJobRun,
markCronJobActive,
resetCronActiveJobsForTests,
} from "../cron/active-jobs.js";
import { resetCronActiveJobsForTests } from "../cron/active-jobs.js";
import {
emitAgentEvent,
registerAgentRunContext,
@@ -21,6 +17,7 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { withEnvAsync } from "../test-utils/env.js";
import { registerActiveCronTaskRun, resetActiveCronTaskRunsForTests } from "./cron-task-cancel.js";
import {
createTaskFlowForTask as createTaskFlowForTaskOrNull,
createManagedTaskFlow as createManagedTaskFlowOrNull,
@@ -466,7 +463,6 @@ describe("task-registry", () => {
cancelSession: hoisted.cancelSessionMock,
}),
killSubagentRunAdmin: async (params) => hoisted.killSubagentRunAdminMock(params),
cancelCronJobRun,
});
});
@@ -476,6 +472,7 @@ describe("task-registry", () => {
resetHeartbeatWakeStateForTests();
resetAgentRunContextForTest();
resetCronActiveJobsForTests();
resetActiveCronTaskRunsForTests();
resetTaskRegistryControlRuntimeForTests();
resetTaskRegistryDeliveryRuntimeForTests();
resetTaskRegistryMaintenanceRuntimeForTests();
@@ -3549,9 +3546,9 @@ describe("task-registry", () => {
if (!task) {
throw new Error("expected cron task");
}
markCronJobActive("nightly-gmail-sync", {
registerActiveCronTaskRun({
runId: "cron:nightly-gmail-sync:123",
abortController,
controller: abortController,
});
const result = await cancelTaskById({

View File

@@ -18,6 +18,7 @@ import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { isChildlessCodexNativeSubagentTask } from "./codex-native-subagent-task.js";
import { cancelActiveCronTaskRun } from "./cron-task-cancel.js";
import {
formatTaskBlockedFollowupMessage,
formatTaskStateChangeMessage,
@@ -2081,17 +2082,16 @@ export async function cancelTaskById(params: {
try {
if (task.runtime !== "cli") {
if (task.runtime === "cron") {
const { cancelCronJobRun } = await loadTaskRegistryControlRuntime();
const result = cancelCronJobRun({
jobId: task.sourceId,
runId: task.runId,
reason: params.reason?.trim() || "Cancelled by operator.",
});
if (!result.found || !result.cancelled) {
if (
!cancelActiveCronTaskRun({
runId: task.runId,
reason: params.reason?.trim() || "Cancelled by operator.",
})
) {
return {
found: true,
cancelled: false,
reason: result.reason,
reason: "Cron task has no active cancellation handle.",
task: cloneTaskRecord(task),
};
}