From 38543d819690155b350fa358cf68b98d11a0fe6d Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Sun, 8 Mar 2026 14:46:33 -0700 Subject: [PATCH] fix(cron): consolidate announce delivery, fire-and-forget trigger, and minimal prompt mode (#40204) * fix(cron): consolidate announce delivery and detach manual runs * fix: queue detached cron runs (#40204) --- CHANGELOG.md | 1 + docs/automation/cron-jobs.md | 2 + docs/cli/cron.md | 2 + .../pi-embedded-runner/run/attempt.test.ts | 12 +- src/agents/pi-embedded-runner/run/attempt.ts | 4 +- src/agents/subagent-announce.timeout.test.ts | 19 ++ src/agents/subagent-announce.ts | 14 +- src/cli/cron-cli.test.ts | 22 +- src/cli/cron-cli/register.cron-simple.ts | 4 +- .../delivery-dispatch.double-announce.test.ts | 23 ++ src/cron/service.issue-regressions.test.ts | 114 ++++++- src/cron/service.ts | 4 + src/cron/service/ops.ts | 114 ++++++- src/cron/service/state.ts | 1 + src/gateway/server-methods/cron.ts | 2 +- src/gateway/server.cron.test.ts | 284 ++++++++++++++---- 16 files changed, 534 insertions(+), 88 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74ebdce9f0f..d20ca2541a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -123,6 +123,7 @@ Docs: https://docs.openclaw.ai - Agents/compaction safeguard pre-check: skip embedded compaction before entering the Pi SDK when a session has no real conversation messages, avoiding unnecessary LLM API calls on idle sessions. (#36451) thanks @Sid-Qin. - Config/schema cache key stability: build merged schema cache keys with incremental hashing to avoid large single-string serialization and prevent `RangeError: Invalid string length` on high-cardinality plugin/channel metadata. (#36603) Thanks @powermaster888. - iMessage/cron completion announces: strip leaked inline reply tags (for example `[[reply_to:6100]]`) from user-visible completion text so announcement deliveries do not expose threading metadata. (#24600) Thanks @vincentkoc. +- Cron/manual run enqueue flow: queue `cron.run` requests behind the cron execution lane, return immediate `{ ok: true, enqueued: true, runId }` acknowledgements, preserve `{ ok: true, ran: false, reason }` skip responses for already-running and not-due jobs, and document the asynchronous completion flow. (#40204) - Control UI/iMessage duplicate reply routing: keep internal webchat turns on dispatcher delivery (instead of origin-channel reroute) so Control UI chats do not duplicate replies into iMessage, while preserving webchat-provider relayed routing for external surfaces. Fixes #33483. Thanks @alicexmolt. - Sessions/daily reset transcript archival: archive prior transcript files during stale-session scheduled/daily resets by capturing the previous session entry before rollover, preventing orphaned transcript files on disk. (#35493) Thanks @byungsker. - Feishu/group slash command detection: normalize group mention wrappers before command-authorization probing so mention-prefixed commands (for example `@Bot/model` and `@Bot /reset`) are recognized as gateway commands instead of being forwarded to the agent. (#35994) Thanks @liuxiaopai-ai. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index b0798898910..47bae78b86f 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -620,6 +620,8 @@ openclaw cron run openclaw cron run --due ``` +`cron.run` now acknowledges once the manual run is queued, not after the job finishes. Successful queue responses look like `{ ok: true, enqueued: true, runId }`. If the job is already running or `--due` finds nothing due, the response stays `{ ok: true, ran: false, reason }`. Use `openclaw cron runs --id ` or the `cron.runs` gateway method to inspect the eventual finished entry. + Edit an existing job (patch fields): ```bash diff --git a/docs/cli/cron.md b/docs/cli/cron.md index 5f5be713de1..28e61e20c99 100644 --- a/docs/cli/cron.md +++ b/docs/cli/cron.md @@ -23,6 +23,8 @@ Note: one-shot (`--at`) jobs delete after success by default. Use `--keep-after- Note: recurring jobs now use exponential retry backoff after consecutive errors (30s → 1m → 5m → 15m → 60m), then return to normal schedule after the next successful run. +Note: `openclaw cron run` now returns as soon as the manual run is queued for execution. Successful responses include `{ ok: true, enqueued: true, runId }`; use `openclaw cron runs --id ` to follow the eventual outcome. + Note: retention/pruning is controlled in config: - `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions. diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 76c4253aa4b..70bd3242f7c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -135,9 +135,15 @@ describe("resolvePromptModeForSession", () => { expect(resolvePromptModeForSession("agent:main:subagent:child")).toBe("minimal"); }); - it("uses full mode for cron sessions", () => { - expect(resolvePromptModeForSession("agent:main:cron:job-1")).toBe("full"); - expect(resolvePromptModeForSession("agent:main:cron:job-1:run:run-abc")).toBe("full"); + it("uses minimal mode for cron sessions", () => { + expect(resolvePromptModeForSession("agent:main:cron:job-1")).toBe("minimal"); + expect(resolvePromptModeForSession("agent:main:cron:job-1:run:run-abc")).toBe("minimal"); + }); + + it("uses full mode for regular and undefined sessions", () => { + expect(resolvePromptModeForSession(undefined)).toBe("full"); + expect(resolvePromptModeForSession("agent:main")).toBe("full"); + expect(resolvePromptModeForSession("agent:main:thread:abc")).toBe("full"); }); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index b57159e52aa..e480eb77797 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -19,7 +19,7 @@ import type { PluginHookBeforeAgentStartResult, PluginHookBeforePromptBuildResult, } from "../../../plugins/types.js"; -import { isSubagentSessionKey } from "../../../routing/session-key.js"; +import { isCronSessionKey, isSubagentSessionKey } from "../../../routing/session-key.js"; import { joinPresentTextSegments } from "../../../shared/text/join-segments.js"; import { resolveSignalReactionLevel } from "../../../signal/reaction-level.js"; import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js"; @@ -613,7 +613,7 @@ export function resolvePromptModeForSession(sessionKey?: string): "minimal" | "f if (!sessionKey) { return "full"; } - return isSubagentSessionKey(sessionKey) ? "minimal" : "full"; + return isSubagentSessionKey(sessionKey) || isCronSessionKey(sessionKey) ? "minimal" : "full"; } export function resolveAttemptFsWorkspaceOnly(params: { diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 346989f493e..1c4925d9272 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -197,6 +197,25 @@ describe("subagent announce timeout config", () => { expect(internalEvents[0]?.announceType).toBe("cron job"); }); + it("regression, keeps child announce internal when requester is a cron run session", async () => { + const cronSessionKey = "agent:main:cron:daily-check:run:run-123"; + + await runAnnounceFlowForTest("run-cron-internal", { + requesterSessionKey: cronSessionKey, + requesterDisplayKey: cronSessionKey, + requesterOrigin: { channel: "discord", to: "channel:cron-results", accountId: "acct-1" }, + }); + + const directAgentCall = findGatewayCall( + (call) => call.method === "agent" && call.expectFinal === true, + ); + expect(directAgentCall?.params?.sessionKey).toBe(cronSessionKey); + expect(directAgentCall?.params?.deliver).toBe(false); + expect(directAgentCall?.params?.channel).toBeUndefined(); + expect(directAgentCall?.params?.to).toBeUndefined(); + expect(directAgentCall?.params?.accountId).toBeUndefined(); + }); + it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => { const parentSessionKey = "agent:main:subagent:parent"; requesterDepthResolver = (sessionKey?: string) => diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 83391755e9c..62b2cc6f0d3 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -14,6 +14,7 @@ import type { ConversationRef } from "../infra/outbound/session-binding-service. import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; +import { isCronSessionKey } from "../sessions/session-key-utils.js"; import { extractTextFromChatContent } from "../shared/chat-content.js"; import { type DeliveryContext, @@ -78,6 +79,10 @@ function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType): n return Math.min(Math.max(1, Math.floor(configured)), MAX_TIMER_SAFE_TIMEOUT_MS); } +function isInternalAnnounceRequesterSession(sessionKey: string | undefined): boolean { + return getSubagentDepthFromSessionStore(sessionKey) >= 1 || isCronSessionKey(sessionKey); +} + function summarizeDeliveryError(error: unknown): string { if (error instanceof Error) { return error.message || "error"; @@ -580,8 +585,7 @@ async function resolveSubagentCompletionOrigin(params: { async function sendAnnounce(item: AnnounceQueueItem) { const cfg = loadConfig(); const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg); - const requesterDepth = getSubagentDepthFromSessionStore(item.sessionKey); - const requesterIsSubagent = requesterDepth >= 1; + const requesterIsSubagent = isInternalAnnounceRequesterSession(item.sessionKey); const origin = item.origin; const threadId = origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined; @@ -1216,6 +1220,8 @@ export async function runSubagentAnnounceFlow(params: { } let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); + const requesterIsInternalSession = () => + requesterDepth >= 1 || isCronSessionKey(targetRequesterSessionKey); let childCompletionFindings: string | undefined; let subagentRegistryRuntime: @@ -1339,7 +1345,7 @@ export async function runSubagentAnnounceFlow(params: { const announceSessionId = childSessionId || "unknown"; const findings = childCompletionFindings || reply || "(no output)"; - let requesterIsSubagent = requesterDepth >= 1; + let requesterIsSubagent = requesterIsInternalSession(); if (requesterIsSubagent) { const { isSubagentSessionRunActive, @@ -1363,7 +1369,7 @@ export async function runSubagentAnnounceFlow(params: { targetRequesterOrigin = normalizeDeliveryContext(fallback.requesterOrigin) ?? targetRequesterOrigin; requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); - requesterIsSubagent = requesterDepth >= 1; + requesterIsSubagent = requesterIsInternalSession(); } } } diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 562a239385d..a6b20ca5b3d 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -156,7 +156,11 @@ async function expectCronEditWithScheduleLookupExit( ).rejects.toThrow("__exit__:1"); } -async function runCronRunAndCaptureExit(params: { ran: boolean; args?: string[] }) { +async function runCronRunAndCaptureExit(params: { + ran?: boolean; + enqueued?: boolean; + args?: string[]; +}) { resetGatewayMock(); callGatewayFromCli.mockImplementation( async (method: string, _opts: unknown, callParams?: unknown) => { @@ -164,7 +168,12 @@ async function runCronRunAndCaptureExit(params: { ran: boolean; args?: string[] return { enabled: true }; } if (method === "cron.run") { - return { ok: true, params: callParams, ran: params.ran }; + return { + ok: true, + params: callParams, + ...(typeof params.ran === "boolean" ? { ran: params.ran } : {}), + ...(typeof params.enqueued === "boolean" ? { enqueued: params.enqueued } : {}), + }; } return { ok: true, params: callParams }; }, @@ -195,13 +204,18 @@ describe("cron cli", () => { ran: true, expectedExitCode: 0, }, + { + name: "exits 0 for cron run when job is queued successfully", + enqueued: true, + expectedExitCode: 0, + }, { name: "exits 1 for cron run when job does not execute", ran: false, expectedExitCode: 1, }, - ])("$name", async ({ ran, expectedExitCode }) => { - const { exitSpy } = await runCronRunAndCaptureExit({ ran }); + ])("$name", async ({ ran, enqueued, expectedExitCode }) => { + const { exitSpy } = await runCronRunAndCaptureExit({ ran, enqueued }); expect(exitSpy).toHaveBeenCalledWith(expectedExitCode); }); diff --git a/src/cli/cron-cli/register.cron-simple.ts b/src/cli/cron-cli/register.cron-simple.ts index ae05ff1fa69..891d8691968 100644 --- a/src/cli/cron-cli/register.cron-simple.ts +++ b/src/cli/cron-cli/register.cron-simple.ts @@ -99,8 +99,8 @@ export function registerCronSimpleCommands(cron: Command) { mode: opts.due ? "due" : "force", }); printCronJson(res); - const result = res as { ok?: boolean; ran?: boolean } | undefined; - defaultRuntime.exit(result?.ok && result?.ran ? 0 : 1); + const result = res as { ok?: boolean; ran?: boolean; enqueued?: boolean } | undefined; + defaultRuntime.exit(result?.ok && (result?.ran || result?.enqueued) ? 0 : 1); } catch (err) { handleCronCliError(err); } diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index fdb77fc22ba..abaf1ae5349 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -208,6 +208,29 @@ describe("dispatchCronDelivery — double-announce guard", () => { expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); }); + it("consolidates descendant output into the cron announce path", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true); + vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue( + "Detailed child result, everything finished successfully.", + ); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true); + + const params = makeBaseParams({ synthesizedText: "on it" }); + const state = await dispatchCronDelivery(params); + + expect(state.deliveryAttempted).toBe(true); + expect(state.delivered).toBe(true); + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + expect(runSubagentAnnounceFlow).toHaveBeenCalledWith( + expect.objectContaining({ + roundOneReply: "Detailed child result, everything finished successfully.", + expectsCompletionMessage: true, + announceType: "cron job", + }), + ); + }); + it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index b2276aeb398..dac28f4b0c9 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -1,6 +1,8 @@ import fs from "node:fs/promises"; import { describe, expect, it, vi } from "vitest"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; +import { clearCommandLane, setCommandLaneConcurrency } from "../process/command-queue.js"; +import { CommandLane } from "../process/lanes.js"; import * as schedule from "./schedule.js"; import { createAbortAwareIsolatedRunner, @@ -15,9 +17,13 @@ import { writeCronStoreSnapshot, } from "./service.issue-regressions.test-helpers.js"; import { CronService } from "./service.js"; -import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js"; +import { + createDeferred, + createNoopLogger, + createRunningCronServiceState, +} from "./service.test-harness.js"; import { computeJobNextRunAtMs } from "./service/jobs.js"; -import { run } from "./service/ops.js"; +import { enqueueRun, run } from "./service/ops.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; import { DEFAULT_JOB_TIMEOUT_MS, @@ -1486,6 +1492,110 @@ describe("Cron issue regressions", () => { expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok"); }); + it("queues manual cron.run requests behind the cron execution lane", async () => { + vi.useRealTimers(); + clearCommandLane(CommandLane.Cron); + setCommandLaneConcurrency(CommandLane.Cron, 1); + + const store = makeStorePath(); + const dueAt = Date.parse("2026-02-06T10:05:02.000Z"); + const first = createDueIsolatedJob({ id: "queued-first", nowMs: dueAt, nextRunAtMs: dueAt }); + const second = createDueIsolatedJob({ + id: "queued-second", + nowMs: dueAt, + nextRunAtMs: dueAt, + }); + await fs.writeFile( + store.storePath, + JSON.stringify({ version: 1, jobs: [first, second] }), + "utf-8", + ); + + let now = dueAt; + let activeRuns = 0; + let peakActiveRuns = 0; + const firstRun = createDeferred<{ status: "ok"; summary: string }>(); + const secondRun = createDeferred<{ status: "ok"; summary: string }>(); + const secondStarted = createDeferred(); + const runIsolatedAgentJob = vi.fn(async (params: { job: { id: string } }) => { + activeRuns += 1; + peakActiveRuns = Math.max(peakActiveRuns, activeRuns); + if (params.job.id === second.id) { + secondStarted.resolve(); + } + try { + const result = + params.job.id === first.id ? await firstRun.promise : await secondRun.promise; + now += 10; + return result; + } finally { + activeRuns -= 1; + } + }); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + cronConfig: { maxConcurrentRuns: 1 }, + log: createNoopLogger(), + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + + const firstAck = await enqueueRun(state, first.id, "force"); + const secondAck = await enqueueRun(state, second.id, "force"); + expect(firstAck).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + expect(secondAck).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + + await vi.waitFor(() => expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1)); + expect(runIsolatedAgentJob.mock.calls[0]?.[0]).toMatchObject({ job: { id: first.id } }); + expect(peakActiveRuns).toBe(1); + + firstRun.resolve({ status: "ok", summary: "first queued run" }); + await secondStarted.promise; + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2); + expect(runIsolatedAgentJob.mock.calls[1]?.[0]).toMatchObject({ job: { id: second.id } }); + expect(peakActiveRuns).toBe(1); + + secondRun.resolve({ status: "ok", summary: "second queued run" }); + await vi.waitFor(() => { + const jobs = state.store?.jobs ?? []; + expect(jobs.find((job) => job.id === first.id)?.state.lastStatus).toBe("ok"); + expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok"); + }); + + clearCommandLane(CommandLane.Cron); + }); + + it("logs unexpected queued manual run background failures once", async () => { + vi.useRealTimers(); + clearCommandLane(CommandLane.Cron); + setCommandLaneConcurrency(CommandLane.Cron, 1); + + const dueAt = Date.parse("2026-02-06T10:05:03.000Z"); + const job = createDueIsolatedJob({ id: "queued-failure", nowMs: dueAt, nextRunAtMs: dueAt }); + const log = createNoopLogger(); + const badStore = `${makeStorePath().storePath}.dir`; + await fs.mkdir(badStore, { recursive: true }); + const state = createRunningCronServiceState({ + storePath: badStore, + log, + nowMs: () => dueAt, + jobs: [job], + }); + + const result = await enqueueRun(state, job.id, "force"); + expect(result).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + + await vi.waitFor(() => expect(log.error).toHaveBeenCalledTimes(1)); + expect(log.error.mock.calls[0]?.[1]).toBe( + "cron: queued manual run background execution failed", + ); + + clearCommandLane(CommandLane.Cron); + }); + // Regression: isolated cron runs must not abort at 1/3 of configured timeoutSeconds. // The bug (issue #29774) caused the CLI-provider resume watchdog (ratio 0.3, maxMs 180 s) // to be applied on fresh sessions because a persisted cliSessionId was passed to diff --git a/src/cron/service.ts b/src/cron/service.ts index 7ccc1cc59e0..a221cb68b15 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -46,6 +46,10 @@ export class CronService { return await ops.run(this.state, id, mode); } + async enqueueRun(id: string, mode?: "due" | "force") { + return await ops.enqueueRun(this.state, id, mode); + } + getJob(id: string): CronJob | undefined { return this.state.store?.jobs.find((job) => job.id === id); } diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index 9f575134c23..c027c8d553f 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -1,3 +1,5 @@ +import { enqueueCommandInLane } from "../../process/command-queue.js"; +import { CommandLane } from "../../process/lanes.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js"; import { @@ -339,8 +341,58 @@ export async function remove(state: CronServiceState, id: string) { }); } -export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { - const prepared = await locked(state, async () => { +type PreparedManualRun = + | { + ok: true; + ran: false; + reason: "already-running" | "not-due"; + } + | { + ok: true; + ran: true; + jobId: string; + startedAt: number; + executionJob: CronJob; + } + | { ok: false }; + +type ManualRunDisposition = + | Extract + | { ok: true; runnable: true }; + +let nextManualRunId = 1; + +async function inspectManualRunDisposition( + state: CronServiceState, + id: string, + mode?: "due" | "force", +): Promise { + return await locked(state, async () => { + warnIfDisabled(state, "run"); + await ensureLoaded(state, { skipRecompute: true }); + // Normalize job tick state (clears stale runningAtMs markers) before + // checking if already running, so a stale marker from a crashed Phase-1 + // persist does not block manual triggers for up to STUCK_RUN_MS (#17554). + recomputeNextRunsForMaintenance(state); + const job = findJobOrThrow(state, id); + if (typeof job.state.runningAtMs === "number") { + return { ok: true, ran: false, reason: "already-running" as const }; + } + const now = state.deps.nowMs(); + const due = isJobDue(job, now, { forced: mode === "force" }); + if (!due) { + return { ok: true, ran: false, reason: "not-due" as const }; + } + return { ok: true, runnable: true } as const; + }); +} + +async function prepareManualRun( + state: CronServiceState, + id: string, + mode?: "due" | "force", +): Promise { + return await locked(state, async () => { warnIfDisabled(state, "run"); await ensureLoaded(state, { skipRecompute: true }); // Normalize job tick state (clears stale runningAtMs markers) before @@ -365,7 +417,7 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f // force-reload from disk cannot start the same job concurrently. await persist(state); emit(state, { jobId: job.id, action: "started", runAtMs: now }); - const executionJob = JSON.parse(JSON.stringify(job)) as typeof job; + const executionJob = JSON.parse(JSON.stringify(job)) as CronJob; return { ok: true, ran: true, @@ -374,13 +426,13 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f executionJob, } as const; }); +} - if (!prepared.ran) { - return prepared; - } - if (!prepared.executionJob || typeof prepared.startedAt !== "number") { - return { ok: false } as const; - } +async function finishPreparedManualRun( + state: CronServiceState, + prepared: Extract, + mode?: "due" | "force", +): Promise { const executionJob = prepared.executionJob; const startedAt = prepared.startedAt; const jobId = prepared.jobId; @@ -461,10 +513,54 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f await persist(state); armTimer(state); }); +} +export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { + const prepared = await prepareManualRun(state, id, mode); + if (!prepared.ok || !prepared.ran) { + return prepared; + } + await finishPreparedManualRun(state, prepared, mode); return { ok: true, ran: true } as const; } +export async function enqueueRun(state: CronServiceState, id: string, mode?: "due" | "force") { + const disposition = await inspectManualRunDisposition(state, id, mode); + if (!disposition.ok || !("runnable" in disposition && disposition.runnable)) { + return disposition; + } + + const runId = `manual:${id}:${state.deps.nowMs()}:${nextManualRunId++}`; + void enqueueCommandInLane( + CommandLane.Cron, + async () => { + const result = await run(state, id, mode); + if (result.ok && "ran" in result && !result.ran) { + state.deps.log.info( + { jobId: id, runId, reason: result.reason }, + "cron: queued manual run skipped before execution", + ); + } + return result; + }, + { + warnAfterMs: 5_000, + onWait: (waitMs, queuedAhead) => { + state.deps.log.warn( + { jobId: id, runId, waitMs, queuedAhead }, + "cron: queued manual run waiting for an execution slot", + ); + }, + }, + ).catch((err) => { + state.deps.log.error( + { jobId: id, runId, err: String(err) }, + "cron: queued manual run background execution failed", + ); + }); + return { ok: true, enqueued: true, runId } as const; +} + export function wakeNow( state: CronServiceState, opts: { mode: "now" | "next-heartbeat"; text: string }, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index b65d0ebaa14..1e42ae089cd 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -142,6 +142,7 @@ export type CronStatusSummary = { export type CronRunResult = | { ok: true; ran: true } + | { ok: true; enqueued: true; runId: string } | { ok: true; ran: false; reason: "not-due" } | { ok: true; ran: false; reason: "already-running" } | { ok: false }; diff --git a/src/gateway/server-methods/cron.ts b/src/gateway/server-methods/cron.ts index a6549c503f6..830d12c9509 100644 --- a/src/gateway/server-methods/cron.ts +++ b/src/gateway/server-methods/cron.ts @@ -212,7 +212,7 @@ export const cronHandlers: GatewayRequestHandlers = { ); return; } - const result = await context.cron.run(jobId, p.mode ?? "force"); + const result = await context.cron.enqueueRun(jobId, p.mode ?? "force"); respond(true, result, undefined); }, "cron.runs": async ({ params, respond, context }) => { diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 4a21354605d..ccaf5441237 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -9,6 +9,7 @@ import { connectOk, cronIsolatedRun, installGatewayTestHooks, + onceMessage, rpcReq, startServerWithClient, testState, @@ -35,7 +36,6 @@ vi.mock("../infra/net/fetch-guard.js", () => ({ })); installGatewayTestHooks({ scope: "suite" }); -const CRON_WAIT_INTERVAL_MS = 5; const CRON_WAIT_TIMEOUT_MS = 3_000; const EMPTY_CRON_STORE_CONTENT = JSON.stringify({ version: 1, jobs: [] }); let cronSuiteTempRootPromise: Promise | null = null; @@ -69,16 +69,20 @@ async function rmTempDir(dir: string) { await fs.rm(dir, { recursive: true, force: true }); } -async function waitForCondition(check: () => boolean | Promise, timeoutMs = 2000) { - await vi.waitFor( - async () => { - const ok = await check(); - if (!ok) { - throw new Error("condition not met"); - } +async function waitForCronEvent( + ws: WebSocket, + check: (payload: Record | null) => boolean, + timeoutMs = CRON_WAIT_TIMEOUT_MS, +) { + const message = await onceMessage( + ws, + (obj) => { + const payload = obj.payload ?? null; + return obj.type === "event" && obj.event === "cron" && check(payload); }, - { timeout: timeoutMs, interval: CRON_WAIT_INTERVAL_MS }, + timeoutMs, ); + return message.payload ?? null; } async function createCronCasePaths(tempPrefix: string): Promise<{ @@ -178,6 +182,8 @@ async function addWebhookCronJob(params: { async function runCronJobForce(ws: WebSocket, id: string) { const response = await rpcReq(ws, "cron.run", { id, mode: "force" }, 20_000); expect(response.ok).toBe(true); + expect(response.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + return response; } function getWebhookCall(index: number) { @@ -263,6 +269,7 @@ describe("gateway server cron", () => { const runRes = await rpcReq(ws, "cron.run", { id: routeJobId, mode: "force" }, 20_000); expect(runRes.ok).toBe(true); + expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); const events = await waitForSystemEvent(); expect(events.some((event) => event.includes("cron route check"))).toBe(true); @@ -441,7 +448,7 @@ describe("gateway server cron", () => { }); test("writes cron run history and auto-runs due jobs", async () => { - const { prevSkipCron, dir } = await setupCronTestRun({ + const { prevSkipCron } = await setupCronTestRun({ tempPrefix: "openclaw-gw-cron-log-", }); @@ -463,31 +470,21 @@ describe("gateway server cron", () => { const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; expect(jobId.length > 0).toBe(true); + const finishedRun = waitForCronEvent( + ws, + (payload) => payload?.jobId === jobId && payload?.action === "finished", + ); const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000); expect(runRes.ok).toBe(true); - const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`); - let raw = ""; - await waitForCondition(async () => { - raw = await fs.readFile(logPath, "utf-8").catch(() => ""); - return raw.trim().length > 0; - }, CRON_WAIT_TIMEOUT_MS); - const line = raw - .split("\n") - .map((l) => l.trim()) - .filter(Boolean) - .at(-1); - const last = JSON.parse(line ?? "{}") as { - jobId?: unknown; - action?: unknown; - status?: unknown; - summary?: unknown; - deliveryStatus?: unknown; - }; - expect(last.action).toBe("finished"); - expect(last.jobId).toBe(jobId); - expect(last.status).toBe("ok"); - expect(last.summary).toBe("hello"); - expect(last.deliveryStatus).toBe("not-requested"); + expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + const finishedPayload = await finishedRun; + expect(finishedPayload).toMatchObject({ + jobId, + action: "finished", + status: "ok", + summary: "hello", + deliveryStatus: "not-requested", + }); const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 }); expect(runsRes.ok).toBe(true); @@ -522,7 +519,7 @@ describe("gateway server cron", () => { const autoRes = await rpcReq(ws, "cron.add", { name: "auto run test", enabled: true, - schedule: { kind: "at", at: new Date(Date.now() + 50).toISOString() }, + schedule: { kind: "at", at: new Date(Date.now() + 200).toISOString() }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "auto" }, @@ -532,11 +529,10 @@ describe("gateway server cron", () => { const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : ""; expect(autoJobId.length > 0).toBe(true); - await waitForCondition(async () => { - const runsRes = await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 }); - const runsPayload = runsRes.payload as { entries?: unknown } | undefined; - return Array.isArray(runsPayload?.entries) && runsPayload.entries.length > 0; - }, CRON_WAIT_TIMEOUT_MS); + await waitForCronEvent( + ws, + (payload) => payload?.jobId === autoJobId && payload?.action === "finished", + ); const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as | { entries?: Array<{ jobId?: unknown }> } | undefined; @@ -548,6 +544,162 @@ describe("gateway server cron", () => { } }, 45_000); + test("returns from cron.run immediately while isolated work continues in background", async () => { + const { prevSkipCron } = await setupCronTestRun({ + tempPrefix: "openclaw-gw-cron-run-detached-", + }); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + let resolveRun: ((value: { status: "ok"; summary: string }) => void) | undefined; + cronIsolatedRun.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveRun = resolve as (value: { status: "ok"; summary: string }) => void; + }), + ); + + try { + const addRes = await rpcReq(ws, "cron.add", { + name: "detached run test", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "do work" }, + delivery: { mode: "none" }, + }); + expect(addRes.ok).toBe(true); + const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id; + const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; + expect(jobId.length > 0).toBe(true); + + const startedRun = waitForCronEvent( + ws, + (payload) => payload?.jobId === jobId && payload?.action === "started", + ); + const finishedRun = waitForCronEvent( + ws, + (payload) => payload?.jobId === jobId && payload?.action === "finished", + ); + const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 1_000); + expect(runRes.ok).toBe(true); + expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + await startedRun; + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + + resolveRun?.({ status: "ok", summary: "background finished" }); + const finishedPayload = await finishedRun; + expect(finishedPayload).toMatchObject({ + jobId, + action: "finished", + status: "ok", + summary: "background finished", + }); + } finally { + await cleanupCronTestRun({ ws, server, prevSkipCron }); + } + }); + + test("returns already-running without starting background work", async () => { + const now = Date.now(); + let resolveRun: ((result: { status: "ok"; summary: string }) => void) | undefined; + cronIsolatedRun.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveRun = resolve; + }), + ); + + const { prevSkipCron } = await setupCronTestRun({ + tempPrefix: "openclaw-gw-cron-run-busy-", + jobs: [ + { + id: "busy-job", + name: "busy job", + enabled: true, + createdAtMs: now - 60_000, + updatedAtMs: now - 60_000, + schedule: { kind: "at", at: new Date(now + 60_000).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "still busy" }, + delivery: { mode: "none" }, + state: { + nextRunAtMs: now + 60_000, + }, + }, + ], + }); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + try { + const startedRun = waitForCronEvent( + ws, + (payload) => payload?.jobId === "busy-job" && payload?.action === "started", + ); + const firstRunRes = await rpcReq(ws, "cron.run", { id: "busy-job", mode: "force" }, 1_000); + expect(firstRunRes.ok).toBe(true); + expect(firstRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + await startedRun; + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + + const secondRunRes = await rpcReq(ws, "cron.run", { id: "busy-job", mode: "force" }, 1_000); + expect(secondRunRes.ok).toBe(true); + expect(secondRunRes.payload).toEqual({ ok: true, ran: false, reason: "already-running" }); + expect(cronIsolatedRun).toHaveBeenCalledTimes(1); + + const finishedRun = waitForCronEvent( + ws, + (payload) => payload?.jobId === "busy-job" && payload?.action === "finished", + ); + resolveRun?.({ status: "ok", summary: "busy done" }); + await finishedRun; + } finally { + await cleanupCronTestRun({ ws, server, prevSkipCron }); + } + }); + + test("returns not-due without starting background work", async () => { + const now = Date.now(); + const { prevSkipCron } = await setupCronTestRun({ + tempPrefix: "openclaw-gw-cron-run-not-due-", + jobs: [ + { + id: "future-job", + name: "future job", + enabled: true, + createdAtMs: now - 60_000, + updatedAtMs: now - 60_000, + schedule: { kind: "at", at: new Date(now + 60_000).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "not yet" }, + delivery: { mode: "none" }, + state: { + nextRunAtMs: now + 60_000, + }, + }, + ], + }); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + cronIsolatedRun.mockClear(); + + try { + const runRes = await rpcReq(ws, "cron.run", { id: "future-job", mode: "due" }, 1_000); + expect(runRes.ok).toBe(true); + expect(runRes.payload).toEqual({ ok: true, ran: false, reason: "not-due" }); + expect(cronIsolatedRun).not.toHaveBeenCalled(); + } finally { + await cleanupCronTestRun({ ws, server, prevSkipCron }); + } + }); + test("posts webhooks for delivery mode and legacy notify fallback only when summary exists", async () => { const legacyNotifyJob = { id: "legacy-notify-job", @@ -608,12 +760,12 @@ describe("gateway server cron", () => { name: "webhook enabled", delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" }, }); - await runCronJobForce(ws, notifyJobId); - - await waitForCondition( - () => fetchWithSsrFGuardMock.mock.calls.length === 1, - CRON_WAIT_TIMEOUT_MS, + const notifyFinished = waitForCronEvent( + ws, + (payload) => payload?.jobId === notifyJobId && payload?.action === "finished", ); + await runCronJobForce(ws, notifyJobId); + await notifyFinished; const notifyCall = getWebhookCall(0); expect(notifyCall.url).toBe("https://example.invalid/cron-finished"); expect(notifyCall.init.method).toBe("POST"); @@ -623,6 +775,10 @@ describe("gateway server cron", () => { expect(notifyBody.action).toBe("finished"); expect(notifyBody.jobId).toBe(notifyJobId); + const legacyFinished = waitForCronEvent( + ws, + (payload) => payload?.jobId === "legacy-notify-job" && payload?.action === "finished", + ); const legacyRunRes = await rpcReq( ws, "cron.run", @@ -630,10 +786,8 @@ describe("gateway server cron", () => { 20_000, ); expect(legacyRunRes.ok).toBe(true); - await waitForCondition( - () => fetchWithSsrFGuardMock.mock.calls.length === 2, - CRON_WAIT_TIMEOUT_MS, - ); + expect(legacyRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + await legacyFinished; const legacyCall = getWebhookCall(1); expect(legacyCall.url).toBe("https://legacy.example.invalid/cron-finished"); expect(legacyCall.init.method).toBe("POST"); @@ -655,10 +809,14 @@ describe("gateway server cron", () => { const silentJobId = typeof silentJobIdValue === "string" ? silentJobIdValue : ""; expect(silentJobId.length > 0).toBe(true); + const silentFinished = waitForCronEvent( + ws, + (payload) => payload?.jobId === silentJobId && payload?.action === "finished", + ); const silentRunRes = await rpcReq(ws, "cron.run", { id: silentJobId, mode: "force" }, 20_000); expect(silentRunRes.ok).toBe(true); - await yieldToEventLoop(); - await yieldToEventLoop(); + expect(silentRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + await silentFinished; expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2); fetchWithSsrFGuardMock.mockClear(); @@ -677,11 +835,12 @@ describe("gateway server cron", () => { }, }, }); - await runCronJobForce(ws, failureDestJobId); - await waitForCondition( - () => fetchWithSsrFGuardMock.mock.calls.length === 1, - CRON_WAIT_TIMEOUT_MS, + const failureDestFinished = waitForCronEvent( + ws, + (payload) => payload?.jobId === failureDestJobId && payload?.action === "finished", ); + await runCronJobForce(ws, failureDestJobId); + await failureDestFinished; const failureDestCall = getWebhookCall(0); expect(failureDestCall.url).toBe("https://example.invalid/failure-destination"); const failureDestBody = failureDestCall.body; @@ -696,9 +855,12 @@ describe("gateway server cron", () => { sessionTarget: "isolated", delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" }, }); + const noSummaryFinished = waitForCronEvent( + ws, + (payload) => payload?.jobId === noSummaryJobId && payload?.action === "finished", + ); await runCronJobForce(ws, noSummaryJobId); - await yieldToEventLoop(); - await yieldToEventLoop(); + await noSummaryFinished; expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1); } finally { await cleanupCronTestRun({ ws, server, prevSkipCron }); @@ -741,12 +903,12 @@ describe("gateway server cron", () => { name: "webhook secretinput object", delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" }, }); - await runCronJobForce(ws, notifyJobId); - - await waitForCondition( - () => fetchWithSsrFGuardMock.mock.calls.length === 1, - CRON_WAIT_TIMEOUT_MS, + const notifyFinished = waitForCronEvent( + ws, + (payload) => payload?.jobId === notifyJobId && payload?.action === "finished", ); + await runCronJobForce(ws, notifyJobId); + await notifyFinished; const [notifyArgs] = fetchWithSsrFGuardMock.mock.calls[0] as unknown as [ { url?: string;