From 729147dcb523a06badee454897fdc201468c4e0c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 08:15:45 +0100 Subject: [PATCH] fix(cron): start isolated timeout after execution begins --- CHANGELOG.md | 1 + src/agents/cli-runner.ts | 1 + src/agents/cli-runner/types.ts | 1 + src/agents/pi-embedded-runner/run.ts | 1 + src/agents/pi-embedded-runner/run/params.ts | 1 + src/cron/isolated-agent/run-executor.ts | 5 + src/cron/isolated-agent/run.ts | 3 + src/cron/service/state.ts | 1 + src/cron/service/timer.regression.test.ts | 121 ++++++++++++++---- src/cron/service/timer.ts | 36 +++++- src/gateway/server-cron.ts | 3 +- .../cron/service-regression-fixtures.ts | 3 +- 12 files changed, 146 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8d51c4295f..bc41ffc0d4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ Docs: https://docs.openclaw.ai - Cron: add `failureAlert.includeSkipped` and `openclaw cron edit --failure-alert-include-skipped` so persistently skipped jobs can alert without counting skips as execution errors or affecting retry backoff. Fixes #60846. Thanks @slideshow-dingo. - Cron: invalidate stale pending runtime slots after live or offline `jobs.json` schedule edits, while preserving due slots for formatting-only rewrites. Fixes #27996 and #71607; carries forward #71651. Thanks @xialonglee and @fagnersouza666. - Cron: keep legacy flat `jobs.json` rows loadable while comparing split-state schedule identities, so old cron stores do not crash before in-memory hydration can normalize them. Thanks @codex. +- Cron: start isolated agent-turn execution timeouts after the runner enters its effective execution lane, so queued cron/manual runs no longer spend their whole timeout budget before useful work begins. Fixes #41783. Thanks @ayanesakura and @Hurray0. - Cron/Telegram: preserve direct-chat thread IDs and optional account IDs when inferring reminder delivery from Telegram direct-thread session keys. Fixes #44270; carries forward #44325, #44351, #44412, and #72657. Thanks @RunMintOn, @arkyu2077, @0xsline, and @vincentkoc. - Cron: omit synthetic `delivery.resolved` errors from `--no-deliver` run records while preserving explicit no-deliver target traces for agent-initiated messages. Fixes #72210; carries forward #72219. Thanks @hatemclawbot-collab and @xydigit-sj. - Cron: classify isolated runs as errors from structured embedded-run execution-denial metadata, with final-output marker fallback for `SYSTEM_RUN_DENIED`, `INVALID_REQUEST`, and approval-binding refusals, so blocked commands no longer appear green in cron history. Fixes #67172; carries forward #67186. Thanks @oc-gh-dr, @hclsys, and @1yihui. diff --git a/src/agents/cli-runner.ts b/src/agents/cli-runner.ts index 6645ed345e2..aad68bb0dfa 100644 --- a/src/agents/cli-runner.ts +++ b/src/agents/cli-runner.ts @@ -64,6 +64,7 @@ function buildCliHookAssistantMessage(params: { export async function runCliAgent(params: RunCliAgentParams): Promise { // Cron gate must fire before prepareCliRunContext — that call allocates // backend resources released only by runPreparedCliAgent's try…finally. + params.onExecutionStarted?.(); if (params.trigger === "cron") { const startedAt = Date.now(); const hookRunner = getGlobalHookRunner(); diff --git a/src/agents/cli-runner/types.ts b/src/agents/cli-runner/types.ts index c9c063b6e1e..be54020c9f5 100644 --- a/src/agents/cli-runner/types.ts +++ b/src/agents/cli-runner/types.ts @@ -44,6 +44,7 @@ export type RunCliAgentParams = { agentAccountId?: string; senderIsOwner?: boolean; abortSignal?: AbortSignal; + onExecutionStarted?: () => void; replyOperation?: ReplyOperation; /** * Close any long-lived CLI live session created for this run after the run diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 6094a5ba0aa..5f2bf0c83dd 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -322,6 +322,7 @@ export async function runEmbeddedPiAgent( return enqueueGlobal(async () => { throwIfAborted(); const started = Date.now(); + params.onExecutionStarted?.(); const workspaceResolution = resolveRunWorkspaceDir({ workspaceDir: params.workspaceDir, sessionKey: params.sessionKey, diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index 544d92c9243..12e8192a94d 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -125,6 +125,7 @@ export type RunEmbeddedPiAgentParams = { timeoutMs: number; runId: string; abortSignal?: AbortSignal; + onExecutionStarted?: () => void; replyOperation?: ReplyOperation; shouldEmitToolResult?: () => boolean; shouldEmitToolOutput?: () => boolean; diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts index bbcc1d5ba79..927a5f14d8f 100644 --- a/src/cron/isolated-agent/run-executor.ts +++ b/src/cron/isolated-agent/run-executor.ts @@ -87,6 +87,7 @@ export function createCronPromptExecutor(params: { cronSession: MutableCronSession; abortSignal?: AbortSignal; abortReason: () => string; + onExecutionStarted?: () => void; }) { const sessionFile = params.cronSession.sessionEntry.sessionFile?.trim() || @@ -145,6 +146,7 @@ export function createCronPromptExecutor(params: { skillsSnapshot: params.skillsSnapshot, messageChannel: params.messageChannel, abortSignal: params.abortSignal, + onExecutionStarted: params.onExecutionStarted, bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignature, senderIsOwner: true, @@ -213,6 +215,7 @@ export function createCronPromptExecutor(params: { forceMessageTool: params.toolPolicy.forceMessageTool, allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, abortSignal: params.abortSignal, + onExecutionStarted: params.onExecutionStarted, bootstrapPromptWarningSignaturesSeen, bootstrapPromptWarningSignature, }); @@ -273,6 +276,7 @@ export async function executeCronRun(params: { abortSignal?: AbortSignal; abortReason: () => string; isAborted: () => boolean; + onExecutionStarted?: () => void; thinkLevel: ThinkLevel | undefined; timeoutMs: number; suppressExecNotifyOnExit: boolean; @@ -309,6 +313,7 @@ export async function executeCronRun(params: { cronSession: params.cronSession, abortSignal: params.abortSignal, abortReason: params.abortReason, + onExecutionStarted: params.onExecutionStarted, }); const runStartedAt = params.runStartedAt ?? Date.now(); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index f304b61cf5b..7286025e1d6 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -416,6 +416,7 @@ type RunCronAgentTurnParams = { message: string; abortSignal?: AbortSignal; signal?: AbortSignal; + onExecutionStarted?: () => void; sessionKey: string; agentId?: string; lane?: string; @@ -968,6 +969,7 @@ export async function runCronIsolatedAgentTurn(params: { message: string; abortSignal?: AbortSignal; signal?: AbortSignal; + onExecutionStarted?: () => void; sessionKey: string; agentId?: string; lane?: string; @@ -1013,6 +1015,7 @@ export async function runCronIsolatedAgentTurn(params: { commandBody: prepared.context.commandBody, persistSessionEntry: prepared.context.persistSessionEntry, abortSignal, + onExecutionStarted: params.onExecutionStarted, abortReason, isAborted, thinkLevel: prepared.context.thinkLevel, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 80d277eecc9..e2c19a55ed2 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -86,6 +86,7 @@ export type CronServiceDeps = { job: CronJob; message: string; abortSignal?: AbortSignal; + onExecutionStarted?: () => void; }) => Promise< { summary?: string; diff --git a/src/cron/service/timer.regression.test.ts b/src/cron/service/timer.regression.test.ts index f010df729bc..9938cfe25c3 100644 --- a/src/cron/service/timer.regression.test.ts +++ b/src/cron/service/timer.regression.test.ts @@ -615,6 +615,74 @@ describe("cron service timer regressions", () => { } }); + it("does not spend isolated execution timeout while waiting for the runner lane (#41783)", async () => { + vi.useFakeTimers(); + try { + const store = timerRegressionFixtures.makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); + const cronJob = createIsolatedRegressionJob({ + id: "timeout-after-lane-start", + name: "timeout after lane start", + scheduledAt, + schedule: { kind: "at", at: new Date(scheduledAt).toISOString() }, + payload: { kind: "agentTurn", message: "work", timeoutSeconds: FAST_TIMEOUT_SECONDS }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + let now = scheduledAt; + const runnerEntered = createDeferred(); + const laneAcquired = createDeferred(); + let observedAbortSignal: AbortSignal | undefined; + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async ({ abortSignal, onExecutionStarted }) => { + observedAbortSignal = abortSignal; + runnerEntered.resolve(); + await laneAcquired.promise; + onExecutionStarted?.(); + await new Promise((resolve) => { + if (!abortSignal) { + resolve(); + return; + } + if (abortSignal.aborted) { + resolve(); + return; + } + abortSignal.addEventListener("abort", () => resolve(), { once: true }); + }); + now += 5; + return { status: "ok" as const, summary: "late" }; + }), + }); + + const timerPromise = onTimer(state); + await runnerEntered.promise; + await vi.advanceTimersByTimeAsync(Math.ceil(FAST_TIMEOUT_SECONDS * 1_000) + 10); + expect(observedAbortSignal?.aborted).toBe(false); + + laneAcquired.resolve(); + await Promise.resolve(); + expect(observedAbortSignal?.aborted).toBe(false); + + await vi.advanceTimersByTimeAsync(Math.ceil(FAST_TIMEOUT_SECONDS * 1_000) + 10); + await timerPromise; + + expect(observedAbortSignal?.aborted).toBe(true); + const job = state.store?.jobs.find((entry) => entry.id === "timeout-after-lane-start"); + expect(job?.state.lastStatus).toBe("error"); + expect(job?.state.lastError).toContain("timed out"); + } finally { + vi.useRealTimers(); + } + }); + it("suppresses isolated follow-up side effects after timeout", async () => { vi.useFakeTimers(); try { @@ -981,30 +1049,39 @@ describe("cron service timer regressions", () => { nowMs: () => now, enqueueSystemEvent: vi.fn(), requestHeartbeatNow: vi.fn(), - runIsolatedAgentJob: vi.fn(async ({ abortSignal }: { abortSignal?: AbortSignal }) => { - started.resolve(); - await new Promise((resolve) => { - if (!abortSignal) { - resolve(); - return; - } - if (abortSignal.aborted) { - abortWallMs = Date.now(); - resolve(); - return; - } - abortSignal.addEventListener( - "abort", - () => { + runIsolatedAgentJob: vi.fn( + async ({ + abortSignal, + onExecutionStarted, + }: { + abortSignal?: AbortSignal; + onExecutionStarted?: () => void; + }) => { + onExecutionStarted?.(); + started.resolve(); + await new Promise((resolve) => { + if (!abortSignal) { + resolve(); + return; + } + if (abortSignal.aborted) { abortWallMs = Date.now(); resolve(); - }, - { once: true }, - ); - }); - now += 5; - return { status: "ok" as const, summary: "done" }; - }), + return; + } + abortSignal.addEventListener( + "abort", + () => { + abortWallMs = Date.now(); + resolve(); + }, + { once: true }, + ); + }); + now += 5; + return { status: "ok" as const, summary: "done" }; + }, + ), }); const timerPromise = onTimer(state); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index ee0c5ec6bdc..f3c0ba53840 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -97,15 +97,30 @@ export async function executeJobCoreWithTimeout( const runAbortController = new AbortController(); let timeoutId: NodeJS.Timeout | undefined; + let rejectTimeout: ((reason?: unknown) => void) | undefined; + const timeoutPromise = new Promise((_, reject) => { + rejectTimeout = reject; + }); + const startTimeout = () => { + if (timeoutId) { + return; + } + timeoutId = setTimeout(() => { + runAbortController.abort(timeoutErrorMessage()); + rejectTimeout?.(new Error(timeoutErrorMessage())); + }, jobTimeoutMs); + }; + const deferTimeoutUntilExecutionStart = + job.sessionTarget !== "main" && job.payload.kind === "agentTurn"; + if (!deferTimeoutUntilExecutionStart) { + startTimeout(); + } try { return await Promise.race([ - executeJobCore(state, job, runAbortController.signal), - new Promise((_, reject) => { - timeoutId = setTimeout(() => { - runAbortController.abort(timeoutErrorMessage()); - reject(new Error(timeoutErrorMessage())); - }, jobTimeoutMs); + executeJobCore(state, job, runAbortController.signal, { + onExecutionStarted: deferTimeoutUntilExecutionStart ? startTimeout : undefined, }), + timeoutPromise, ]); } finally { if (timeoutId) { @@ -1178,6 +1193,9 @@ export async function executeJobCore( state: CronServiceState, job: CronJob, abortSignal?: AbortSignal, + options?: { + onExecutionStarted?: () => void; + }, ): Promise< CronRunOutcome & CronRunTelemetry & { @@ -1219,7 +1237,7 @@ export async function executeJobCore( return await executeMainSessionCronJob(state, job, abortSignal, waitWithAbort); } - return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError); + return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError, options); } async function executeMainSessionCronJob( @@ -1329,6 +1347,9 @@ async function executeDetachedCronJob( job: CronJob, abortSignal: AbortSignal | undefined, resolveAbortError: () => { status: "error"; error: string }, + options?: { + onExecutionStarted?: () => void; + }, ): Promise< CronRunOutcome & CronRunTelemetry & { @@ -1348,6 +1369,7 @@ async function executeDetachedCronJob( job, message: job.payload.message, abortSignal, + onExecutionStarted: options?.onExecutionStarted, }); if (abortSignal?.aborted) { diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index ca919f386e1..6deefc776a1 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -221,7 +221,7 @@ export function buildGatewayCronService(params: { deps: { ...params.deps, runtime: defaultRuntime }, }); }, - runIsolatedAgentJob: async ({ job, message, abortSignal }) => { + runIsolatedAgentJob: async ({ job, message, abortSignal, onExecutionStarted }) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); const sessionKey = resolveCronSessionTargetSessionKey(job.sessionTarget) ?? `cron:${job.id}`; try { @@ -231,6 +231,7 @@ export function buildGatewayCronService(params: { job, message, abortSignal, + onExecutionStarted, agentId, sessionKey, lane: "cron", diff --git a/test/helpers/cron/service-regression-fixtures.ts b/test/helpers/cron/service-regression-fixtures.ts index dd23c8fb195..78ece8ebb93 100644 --- a/test/helpers/cron/service-regression-fixtures.ts +++ b/test/helpers/cron/service-regression-fixtures.ts @@ -133,9 +133,10 @@ export function createDefaultIsolatedRunner(): CronServiceDeps["runIsolatedAgent export function createAbortAwareIsolatedRunner(summary = "late") { let observedAbortSignal: AbortSignal | undefined; const started = createDeferred(); - const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => { + const runIsolatedAgentJob = vi.fn(async ({ abortSignal, onExecutionStarted }) => { observedAbortSignal = abortSignal; started.resolve(); + onExecutionStarted?.(); await new Promise((resolve) => { if (!abortSignal) { return;