diff --git a/src/cron/active-jobs.ts b/src/cron/active-jobs.ts index 5c3a55f4313..dc531d437b2 100644 --- a/src/cron/active-jobs.ts +++ b/src/cron/active-jobs.ts @@ -2,71 +2,33 @@ import { resolveGlobalSingleton } from "../shared/global-singleton.js"; type CronActiveJobState = { - activeJobRuns: Map>; + activeJobIds: Set; }; const CRON_ACTIVE_JOB_STATE_KEY = Symbol.for("openclaw.cron.activeJobs"); -const DEFAULT_RUN_KEY = "__cron-job__"; - -type ActiveCronJobRun = { - runId?: string; - abortController?: AbortController; -}; - -type MarkCronJobActiveOptions = { - runId?: string; - abortController?: AbortController; -}; - -type CancelCronJobRunResult = - | { found: false; cancelled: false; reason: string } - | { found: true; cancelled: false; reason: string } - | { found: true; cancelled: true }; function getCronActiveJobState(): CronActiveJobState { // Cron runs can cross module reload boundaries in tests and dev watch; keep - // the in-flight job map process-global so duplicate-run guards share state. + // the in-flight job set process-global so duplicate-run guards share state. return resolveGlobalSingleton(CRON_ACTIVE_JOB_STATE_KEY, () => ({ - activeJobRuns: new Map>(), + activeJobIds: new Set(), })); } -function normalizeRunKey(runId: string | undefined): string { - return runId?.trim() || DEFAULT_RUN_KEY; -} - /** Marks a cron job id as currently executing for duplicate-run suppression. */ -export function markCronJobActive(jobId: string, opts?: MarkCronJobActiveOptions) { +export function markCronJobActive(jobId: string) { if (!jobId) { return; } - const state = getCronActiveJobState(); - const runs = state.activeJobRuns.get(jobId) ?? new Map(); - runs.set(normalizeRunKey(opts?.runId), { - ...(opts?.runId ? { runId: opts.runId } : {}), - ...(opts?.abortController ? { abortController: opts.abortController } : {}), - }); - state.activeJobRuns.set(jobId, runs); + getCronActiveJobState().activeJobIds.add(jobId); } /** Clears the active marker when a cron run exits or is abandoned. */ -export function clearCronJobActive(jobId: string, runId?: string) { +export function clearCronJobActive(jobId: string) { if (!jobId) { return; } - const state = getCronActiveJobState(); - if (runId === undefined) { - state.activeJobRuns.delete(jobId); - return; - } - const runs = state.activeJobRuns.get(jobId); - if (!runs) { - return; - } - runs.delete(normalizeRunKey(runId)); - if (runs.size === 0) { - state.activeJobRuns.delete(jobId); - } + getCronActiveJobState().activeJobIds.delete(jobId); } /** Returns whether the given cron job id is currently executing in this process. */ @@ -74,73 +36,15 @@ export function isCronJobActive(jobId: string) { if (!jobId) { return false; } - return (getCronActiveJobState().activeJobRuns.get(jobId)?.size ?? 0) > 0; + return getCronActiveJobState().activeJobIds.has(jobId); } /** Returns whether any cron run is active in this process. */ export function hasActiveCronJobs() { - return getCronActiveJobState().activeJobRuns.size > 0; -} - -/** Aborts an active cron run in the current process when one owns the task row. */ -export function cancelCronJobRun(params: { - jobId?: string; - runId?: string; - reason?: string; -}): CancelCronJobRunResult { - const jobId = params.jobId?.trim(); - if (!jobId) { - return { - found: false, - cancelled: false, - reason: "Cron task has no cancellable job id.", - }; - } - const runs = getCronActiveJobState().activeJobRuns.get(jobId); - if (!runs || runs.size === 0) { - return { - found: false, - cancelled: false, - reason: "Cron task is not active in this gateway process.", - }; - } - let run: ActiveCronJobRun | undefined; - if (params.runId) { - run = runs.get(normalizeRunKey(params.runId)); - } else { - const first = runs.values().next(); - run = first.done ? undefined : first.value; - } - if (!run) { - return { - found: false, - cancelled: false, - reason: "Cron task run is not active in this gateway process.", - }; - } - const controller = run.abortController; - if (!controller) { - return { - found: true, - cancelled: false, - reason: "Cron task has no active cancellation handle.", - }; - } - if (controller.signal.aborted) { - return { - found: true, - cancelled: false, - reason: "Cron task is already cancelling.", - }; - } - controller.abort(params.reason?.trim() || "Cancelled by operator."); - return { - found: true, - cancelled: true, - }; + return getCronActiveJobState().activeJobIds.size > 0; } /** Clears process-global cron active-job state between tests. */ export function resetCronActiveJobsForTests() { - getCronActiveJobState().activeJobRuns.clear(); + getCronActiveJobState().activeJobIds.clear(); } diff --git a/src/cron/service/timer.regression.test.ts b/src/cron/service/timer.regression.test.ts index 439c5347f95..186b4da1ac8 100644 --- a/src/cron/service/timer.regression.test.ts +++ b/src/cron/service/timer.regression.test.ts @@ -11,14 +11,16 @@ import { setupCronRegressionFixtures, } from "../../../test/helpers/cron/service-regression-fixtures.js"; import { HEARTBEAT_SKIP_LANES_BUSY, type HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; +import { + cancelActiveCronTaskRun, + resetActiveCronTaskRunsForTests, +} from "../../tasks/cron-task-cancel.js"; import { cancelTaskById, listTaskRecords, resetTaskRegistryControlRuntimeForTests, resetTaskRegistryForTests, - setTaskRegistryControlRuntimeForTests, } from "../../tasks/task-registry.js"; -import { cancelCronJobRun } from "../active-jobs.js"; import * as schedule from "../schedule.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { @@ -889,13 +891,12 @@ describe("cron service timer regressions", () => { const timerPromise = onTimer(state); const observedAbortSignal = await runnerStarted.promise; const runId = `cron:cancel-before-timeout:${scheduledAt}`; - const cancelled = cancelCronJobRun({ - jobId: "cancel-before-timeout", + const cancelled = cancelActiveCronTaskRun({ runId, reason: "Cancelled by operator.", }); - expect(cancelled).toEqual({ found: true, cancelled: true }); + expect(cancelled).toBe(true); expect(observedAbortSignal?.aborted).toBe(true); await vi.advanceTimersByTimeAsync(Math.ceil(FAST_TIMEOUT_SECONDS * 1_000) + 10); @@ -1131,13 +1132,6 @@ describe("cron service timer regressions", () => { vi.useFakeTimers(); try { resetTaskRegistryForTests(); - setTaskRegistryControlRuntimeForTests({ - getAcpSessionManager: () => ({ - cancelSession: vi.fn(), - }), - killSubagentRunAdmin: vi.fn(), - cancelCronJobRun, - }); const store = timerRegressionFixtures.makeStorePath(); const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); @@ -1227,6 +1221,7 @@ describe("cron service timer regressions", () => { }), ); } finally { + resetActiveCronTaskRunsForTests(); resetTaskRegistryControlRuntimeForTests(); resetTaskRegistryForTests(); vi.useRealTimers(); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index e38ff511697..9bc5eb97fb8 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -13,6 +13,7 @@ import { normalizeAgentId, resolveAgentIdFromSessionKey, } from "../../routing/session-key.js"; +import { registerActiveCronTaskRun } from "../../tasks/cron-task-cancel.js"; import { deliveryContextFromSession } from "../../utils/delivery-context.shared.js"; import type { DeliveryContext } from "../../utils/delivery-context.types.js"; import { clearCronJobActive, markCronJobActive } from "../active-jobs.js"; @@ -124,14 +125,16 @@ export async function executeJobCoreWithTimeout( opts?: { runId?: string }, ): Promise>> { const runAbortController = new AbortController(); - markCronJobActive(job.id, { - runId: opts?.runId, - // Main-session cron jobs enqueue work into a downstream child session. - // The cron wrapper does not own that queued run, so exposing its abort - // signal would let task cancellation mark the ledger row cancelled while - // the child session can continue running. - ...(job.sessionTarget !== "main" ? { abortController: runAbortController } : {}), - }); + // Main-session cron jobs enqueue work into a downstream child session. The + // cron wrapper does not own that queued run, so it must not expose a task + // cancellation handle that could make the wrapper row lie about child state. + const releaseCronTaskRun = + job.sessionTarget !== "main" + ? registerActiveCronTaskRun({ + runId: opts?.runId, + controller: runAbortController, + }) + : undefined; const jobTimeoutMs = resolveCronJobTimeoutMs(job); try { if (typeof jobTimeoutMs !== "number") { @@ -193,7 +196,7 @@ export async function executeJobCoreWithTimeout( watchdog.dispose(); } } finally { - clearCronJobActive(job.id, opts?.runId); + releaseCronTaskRun?.(); } } diff --git a/src/tasks/cron-task-cancel.ts b/src/tasks/cron-task-cancel.ts new file mode 100644 index 00000000000..51c6c4dfae5 --- /dev/null +++ b/src/tasks/cron-task-cancel.ts @@ -0,0 +1,45 @@ +// Process-local cancellation handles for live cron task runs. + +type CronTaskCancelHandle = { + controller: AbortController; +}; + +const activeCronTaskRunsByRunId = new Map(); + +export function registerActiveCronTaskRun(params: { + runId: string | undefined; + controller: AbortController; +}): (() => void) | undefined { + const runId = params.runId?.trim(); + if (!runId) { + return undefined; + } + activeCronTaskRunsByRunId.set(runId, { controller: params.controller }); + return () => { + if (activeCronTaskRunsByRunId.get(runId)?.controller === params.controller) { + activeCronTaskRunsByRunId.delete(runId); + } + }; +} + +export function cancelActiveCronTaskRun(params: { + runId: string | undefined; + reason?: string; +}): boolean { + const runId = params.runId?.trim(); + if (!runId) { + return false; + } + const handle = activeCronTaskRunsByRunId.get(runId); + if (!handle) { + return false; + } + if (!handle.controller.signal.aborted) { + handle.controller.abort(params.reason?.trim() || "Cancelled by operator."); + } + return true; +} + +export function resetActiveCronTaskRunsForTests(): void { + activeCronTaskRunsByRunId.clear(); +} diff --git a/src/tasks/task-executor.test.ts b/src/tasks/task-executor.test.ts index b5fb83592cc..b59e85aac58 100644 --- a/src/tasks/task-executor.test.ts +++ b/src/tasks/task-executor.test.ts @@ -117,11 +117,6 @@ async function withTaskExecutorStateDir(run: (stateDir: string) => Promise cancelSession: hoisted.cancelSessionMock, }), killSubagentRunAdmin: async (params) => hoisted.killSubagentRunAdminMock(params), - cancelCronJobRun: () => ({ - found: false, - cancelled: false, - reason: "Cron task is not active in this gateway process.", - }), }); try { await run(stateDir); diff --git a/src/tasks/task-registry-control.runtime.ts b/src/tasks/task-registry-control.runtime.ts index 8e571e04549..4795b7bd3dc 100644 --- a/src/tasks/task-registry-control.runtime.ts +++ b/src/tasks/task-registry-control.runtime.ts @@ -1,4 +1,3 @@ // Runtime control seam for cancelling ACP sessions and subagent runs from task APIs. export { getAcpSessionManager } from "../acp/control-plane/manager.js"; export { killSubagentRunAdmin } from "../agents/subagent-control.js"; -export { cancelCronJobRun } from "../cron/active-jobs.js"; diff --git a/src/tasks/task-registry-control.types.ts b/src/tasks/task-registry-control.types.ts index d2f86dce7e5..d95714ff572 100644 --- a/src/tasks/task-registry-control.types.ts +++ b/src/tasks/task-registry-control.types.ts @@ -22,21 +22,9 @@ export type KillSubagentRunAdmin = (params: { sessionKey: string; }) => Promise; -export type CancelCronJobRunResult = - | { found: false; cancelled: false; reason: string } - | { found: true; cancelled: false; reason: string } - | { found: true; cancelled: true }; - -export type CancelCronJobRun = (params: { - jobId?: string; - runId?: string; - reason?: string; -}) => CancelCronJobRunResult; - export type TaskRegistryControlRuntime = { getAcpSessionManager: () => { cancelSession: CancelAcpSessionAdmin; }; killSubagentRunAdmin: KillSubagentRunAdmin; - cancelCronJobRun: CancelCronJobRun; }; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 565b3089fa6..02f3a079456 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -2,11 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js"; import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js"; -import { - cancelCronJobRun, - markCronJobActive, - resetCronActiveJobsForTests, -} from "../cron/active-jobs.js"; +import { resetCronActiveJobsForTests } from "../cron/active-jobs.js"; import { emitAgentEvent, registerAgentRunContext, @@ -21,6 +17,7 @@ import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-even import type { ParsedAgentSessionKey } from "../routing/session-key.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { withEnvAsync } from "../test-utils/env.js"; +import { registerActiveCronTaskRun, resetActiveCronTaskRunsForTests } from "./cron-task-cancel.js"; import { createTaskFlowForTask as createTaskFlowForTaskOrNull, createManagedTaskFlow as createManagedTaskFlowOrNull, @@ -466,7 +463,6 @@ describe("task-registry", () => { cancelSession: hoisted.cancelSessionMock, }), killSubagentRunAdmin: async (params) => hoisted.killSubagentRunAdminMock(params), - cancelCronJobRun, }); }); @@ -476,6 +472,7 @@ describe("task-registry", () => { resetHeartbeatWakeStateForTests(); resetAgentRunContextForTest(); resetCronActiveJobsForTests(); + resetActiveCronTaskRunsForTests(); resetTaskRegistryControlRuntimeForTests(); resetTaskRegistryDeliveryRuntimeForTests(); resetTaskRegistryMaintenanceRuntimeForTests(); @@ -3549,9 +3546,9 @@ describe("task-registry", () => { if (!task) { throw new Error("expected cron task"); } - markCronJobActive("nightly-gmail-sync", { + registerActiveCronTaskRun({ runId: "cron:nightly-gmail-sync:123", - abortController, + controller: abortController, }); const result = await cancelTaskById({ diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index 4c33b80046c..cf001e81240 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -18,6 +18,7 @@ import { parseAgentSessionKey } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; import { isChildlessCodexNativeSubagentTask } from "./codex-native-subagent-task.js"; +import { cancelActiveCronTaskRun } from "./cron-task-cancel.js"; import { formatTaskBlockedFollowupMessage, formatTaskStateChangeMessage, @@ -2081,17 +2082,16 @@ export async function cancelTaskById(params: { try { if (task.runtime !== "cli") { if (task.runtime === "cron") { - const { cancelCronJobRun } = await loadTaskRegistryControlRuntime(); - const result = cancelCronJobRun({ - jobId: task.sourceId, - runId: task.runId, - reason: params.reason?.trim() || "Cancelled by operator.", - }); - if (!result.found || !result.cancelled) { + if ( + !cancelActiveCronTaskRun({ + runId: task.runId, + reason: params.reason?.trim() || "Cancelled by operator.", + }) + ) { return { found: true, cancelled: false, - reason: result.reason, + reason: "Cron task has no active cancellation handle.", task: cloneTaskRecord(task), }; }