fix(cron): consolidate announce delivery, fire-and-forget trigger, and minimal prompt mode (#40204)

* fix(cron): consolidate announce delivery and detach manual runs

* fix: queue detached cron runs (#40204)
This commit is contained in:
Tyler Yust
2026-03-08 14:46:33 -07:00
committed by GitHub
parent 7dfd77abeb
commit 38543d8196
16 changed files with 534 additions and 88 deletions

View File

@@ -123,6 +123,7 @@ Docs: https://docs.openclaw.ai
- Agents/compaction safeguard pre-check: skip embedded compaction before entering the Pi SDK when a session has no real conversation messages, avoiding unnecessary LLM API calls on idle sessions. (#36451) thanks @Sid-Qin. - Agents/compaction safeguard pre-check: skip embedded compaction before entering the Pi SDK when a session has no real conversation messages, avoiding unnecessary LLM API calls on idle sessions. (#36451) thanks @Sid-Qin.
- Config/schema cache key stability: build merged schema cache keys with incremental hashing to avoid large single-string serialization and prevent `RangeError: Invalid string length` on high-cardinality plugin/channel metadata. (#36603) Thanks @powermaster888. - Config/schema cache key stability: build merged schema cache keys with incremental hashing to avoid large single-string serialization and prevent `RangeError: Invalid string length` on high-cardinality plugin/channel metadata. (#36603) Thanks @powermaster888.
- iMessage/cron completion announces: strip leaked inline reply tags (for example `[[reply_to:6100]]`) from user-visible completion text so announcement deliveries do not expose threading metadata. (#24600) Thanks @vincentkoc. - iMessage/cron completion announces: strip leaked inline reply tags (for example `[[reply_to:6100]]`) from user-visible completion text so announcement deliveries do not expose threading metadata. (#24600) Thanks @vincentkoc.
- Cron/manual run enqueue flow: queue `cron.run` requests behind the cron execution lane, return immediate `{ ok: true, enqueued: true, runId }` acknowledgements, preserve `{ ok: true, ran: false, reason }` skip responses for already-running and not-due jobs, and document the asynchronous completion flow. (#40204)
- Control UI/iMessage duplicate reply routing: keep internal webchat turns on dispatcher delivery (instead of origin-channel reroute) so Control UI chats do not duplicate replies into iMessage, while preserving webchat-provider relayed routing for external surfaces. Fixes #33483. Thanks @alicexmolt. - Control UI/iMessage duplicate reply routing: keep internal webchat turns on dispatcher delivery (instead of origin-channel reroute) so Control UI chats do not duplicate replies into iMessage, while preserving webchat-provider relayed routing for external surfaces. Fixes #33483. Thanks @alicexmolt.
- Sessions/daily reset transcript archival: archive prior transcript files during stale-session scheduled/daily resets by capturing the previous session entry before rollover, preventing orphaned transcript files on disk. (#35493) Thanks @byungsker. - Sessions/daily reset transcript archival: archive prior transcript files during stale-session scheduled/daily resets by capturing the previous session entry before rollover, preventing orphaned transcript files on disk. (#35493) Thanks @byungsker.
- Feishu/group slash command detection: normalize group mention wrappers before command-authorization probing so mention-prefixed commands (for example `@Bot/model` and `@Bot /reset`) are recognized as gateway commands instead of being forwarded to the agent. (#35994) Thanks @liuxiaopai-ai. - Feishu/group slash command detection: normalize group mention wrappers before command-authorization probing so mention-prefixed commands (for example `@Bot/model` and `@Bot /reset`) are recognized as gateway commands instead of being forwarded to the agent. (#35994) Thanks @liuxiaopai-ai.

View File

@@ -620,6 +620,8 @@ openclaw cron run <jobId>
openclaw cron run <jobId> --due openclaw cron run <jobId> --due
``` ```
`cron.run` now acknowledges once the manual run is queued, not after the job finishes. Successful queue responses look like `{ ok: true, enqueued: true, runId }`. If the job is already running or `--due` finds nothing due, the response stays `{ ok: true, ran: false, reason }`. Use `openclaw cron runs --id <jobId>` or the `cron.runs` gateway method to inspect the eventual finished entry.
Edit an existing job (patch fields): Edit an existing job (patch fields):
```bash ```bash

View File

@@ -23,6 +23,8 @@ Note: one-shot (`--at`) jobs delete after success by default. Use `--keep-after-
Note: recurring jobs now use exponential retry backoff after consecutive errors (30s → 1m → 5m → 15m → 60m), then return to normal schedule after the next successful run. Note: recurring jobs now use exponential retry backoff after consecutive errors (30s → 1m → 5m → 15m → 60m), then return to normal schedule after the next successful run.
Note: `openclaw cron run` now returns as soon as the manual run is queued for execution. Successful responses include `{ ok: true, enqueued: true, runId }`; use `openclaw cron runs --id <job-id>` to follow the eventual outcome.
Note: retention/pruning is controlled in config: Note: retention/pruning is controlled in config:
- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions. - `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions.

View File

@@ -135,9 +135,15 @@ describe("resolvePromptModeForSession", () => {
expect(resolvePromptModeForSession("agent:main:subagent:child")).toBe("minimal"); expect(resolvePromptModeForSession("agent:main:subagent:child")).toBe("minimal");
}); });
it("uses full mode for cron sessions", () => { it("uses minimal mode for cron sessions", () => {
expect(resolvePromptModeForSession("agent:main:cron:job-1")).toBe("full"); expect(resolvePromptModeForSession("agent:main:cron:job-1")).toBe("minimal");
expect(resolvePromptModeForSession("agent:main:cron:job-1:run:run-abc")).toBe("full"); expect(resolvePromptModeForSession("agent:main:cron:job-1:run:run-abc")).toBe("minimal");
});
it("uses full mode for regular and undefined sessions", () => {
expect(resolvePromptModeForSession(undefined)).toBe("full");
expect(resolvePromptModeForSession("agent:main")).toBe("full");
expect(resolvePromptModeForSession("agent:main:thread:abc")).toBe("full");
}); });
}); });

View File

@@ -19,7 +19,7 @@ import type {
PluginHookBeforeAgentStartResult, PluginHookBeforeAgentStartResult,
PluginHookBeforePromptBuildResult, PluginHookBeforePromptBuildResult,
} from "../../../plugins/types.js"; } from "../../../plugins/types.js";
import { isSubagentSessionKey } from "../../../routing/session-key.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../../routing/session-key.js";
import { joinPresentTextSegments } from "../../../shared/text/join-segments.js"; import { joinPresentTextSegments } from "../../../shared/text/join-segments.js";
import { resolveSignalReactionLevel } from "../../../signal/reaction-level.js"; import { resolveSignalReactionLevel } from "../../../signal/reaction-level.js";
import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js"; import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js";
@@ -613,7 +613,7 @@ export function resolvePromptModeForSession(sessionKey?: string): "minimal" | "f
if (!sessionKey) { if (!sessionKey) {
return "full"; return "full";
} }
return isSubagentSessionKey(sessionKey) ? "minimal" : "full"; return isSubagentSessionKey(sessionKey) || isCronSessionKey(sessionKey) ? "minimal" : "full";
} }
export function resolveAttemptFsWorkspaceOnly(params: { export function resolveAttemptFsWorkspaceOnly(params: {

View File

@@ -197,6 +197,25 @@ describe("subagent announce timeout config", () => {
expect(internalEvents[0]?.announceType).toBe("cron job"); expect(internalEvents[0]?.announceType).toBe("cron job");
}); });
it("regression, keeps child announce internal when requester is a cron run session", async () => {
const cronSessionKey = "agent:main:cron:daily-check:run:run-123";
await runAnnounceFlowForTest("run-cron-internal", {
requesterSessionKey: cronSessionKey,
requesterDisplayKey: cronSessionKey,
requesterOrigin: { channel: "discord", to: "channel:cron-results", accountId: "acct-1" },
});
const directAgentCall = findGatewayCall(
(call) => call.method === "agent" && call.expectFinal === true,
);
expect(directAgentCall?.params?.sessionKey).toBe(cronSessionKey);
expect(directAgentCall?.params?.deliver).toBe(false);
expect(directAgentCall?.params?.channel).toBeUndefined();
expect(directAgentCall?.params?.to).toBeUndefined();
expect(directAgentCall?.params?.accountId).toBeUndefined();
});
it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => { it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => {
const parentSessionKey = "agent:main:subagent:parent"; const parentSessionKey = "agent:main:subagent:parent";
requesterDepthResolver = (sessionKey?: string) => requesterDepthResolver = (sessionKey?: string) =>

View File

@@ -14,6 +14,7 @@ import type { ConversationRef } from "../infra/outbound/session-binding-service.
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js"; import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
import { isCronSessionKey } from "../sessions/session-key-utils.js";
import { extractTextFromChatContent } from "../shared/chat-content.js"; import { extractTextFromChatContent } from "../shared/chat-content.js";
import { import {
type DeliveryContext, type DeliveryContext,
@@ -78,6 +79,10 @@ function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): n
return Math.min(Math.max(1, Math.floor(configured)), MAX_TIMER_SAFE_TIMEOUT_MS); return Math.min(Math.max(1, Math.floor(configured)), MAX_TIMER_SAFE_TIMEOUT_MS);
} }
function isInternalAnnounceRequesterSession(sessionKey: string | undefined): boolean {
return getSubagentDepthFromSessionStore(sessionKey) >= 1 || isCronSessionKey(sessionKey);
}
function summarizeDeliveryError(error: unknown): string { function summarizeDeliveryError(error: unknown): string {
if (error instanceof Error) { if (error instanceof Error) {
return error.message || "error"; return error.message || "error";
@@ -580,8 +585,7 @@ async function resolveSubagentCompletionOrigin(params: {
async function sendAnnounce(item: AnnounceQueueItem) { async function sendAnnounce(item: AnnounceQueueItem) {
const cfg = loadConfig(); const cfg = loadConfig();
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg); const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
const requesterDepth = getSubagentDepthFromSessionStore(item.sessionKey); const requesterIsSubagent = isInternalAnnounceRequesterSession(item.sessionKey);
const requesterIsSubagent = requesterDepth >= 1;
const origin = item.origin; const origin = item.origin;
const threadId = const threadId =
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined; origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
@@ -1216,6 +1220,8 @@ export async function runSubagentAnnounceFlow(params: {
} }
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
const requesterIsInternalSession = () =>
requesterDepth >= 1 || isCronSessionKey(targetRequesterSessionKey);
let childCompletionFindings: string | undefined; let childCompletionFindings: string | undefined;
let subagentRegistryRuntime: let subagentRegistryRuntime:
@@ -1339,7 +1345,7 @@ export async function runSubagentAnnounceFlow(params: {
const announceSessionId = childSessionId || "unknown"; const announceSessionId = childSessionId || "unknown";
const findings = childCompletionFindings || reply || "(no output)"; const findings = childCompletionFindings || reply || "(no output)";
let requesterIsSubagent = requesterDepth >= 1; let requesterIsSubagent = requesterIsInternalSession();
if (requesterIsSubagent) { if (requesterIsSubagent) {
const { const {
isSubagentSessionRunActive, isSubagentSessionRunActive,
@@ -1363,7 +1369,7 @@ export async function runSubagentAnnounceFlow(params: {
targetRequesterOrigin = targetRequesterOrigin =
normalizeDeliveryContext(fallback.requesterOrigin) ?? targetRequesterOrigin; normalizeDeliveryContext(fallback.requesterOrigin) ?? targetRequesterOrigin;
requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
requesterIsSubagent = requesterDepth >= 1; requesterIsSubagent = requesterIsInternalSession();
} }
} }
} }

View File

@@ -156,7 +156,11 @@ async function expectCronEditWithScheduleLookupExit(
).rejects.toThrow("__exit__:1"); ).rejects.toThrow("__exit__:1");
} }
async function runCronRunAndCaptureExit(params: { ran: boolean; args?: string[] }) { async function runCronRunAndCaptureExit(params: {
ran?: boolean;
enqueued?: boolean;
args?: string[];
}) {
resetGatewayMock(); resetGatewayMock();
callGatewayFromCli.mockImplementation( callGatewayFromCli.mockImplementation(
async (method: string, _opts: unknown, callParams?: unknown) => { async (method: string, _opts: unknown, callParams?: unknown) => {
@@ -164,7 +168,12 @@ async function runCronRunAndCaptureExit(params: { ran: boolean; args?: string[]
return { enabled: true }; return { enabled: true };
} }
if (method === "cron.run") { if (method === "cron.run") {
return { ok: true, params: callParams, ran: params.ran }; return {
ok: true,
params: callParams,
...(typeof params.ran === "boolean" ? { ran: params.ran } : {}),
...(typeof params.enqueued === "boolean" ? { enqueued: params.enqueued } : {}),
};
} }
return { ok: true, params: callParams }; return { ok: true, params: callParams };
}, },
@@ -195,13 +204,18 @@ describe("cron cli", () => {
ran: true, ran: true,
expectedExitCode: 0, expectedExitCode: 0,
}, },
{
name: "exits 0 for cron run when job is queued successfully",
enqueued: true,
expectedExitCode: 0,
},
{ {
name: "exits 1 for cron run when job does not execute", name: "exits 1 for cron run when job does not execute",
ran: false, ran: false,
expectedExitCode: 1, expectedExitCode: 1,
}, },
])("$name", async ({ ran, expectedExitCode }) => { ])("$name", async ({ ran, enqueued, expectedExitCode }) => {
const { exitSpy } = await runCronRunAndCaptureExit({ ran }); const { exitSpy } = await runCronRunAndCaptureExit({ ran, enqueued });
expect(exitSpy).toHaveBeenCalledWith(expectedExitCode); expect(exitSpy).toHaveBeenCalledWith(expectedExitCode);
}); });

View File

@@ -99,8 +99,8 @@ export function registerCronSimpleCommands(cron: Command) {
mode: opts.due ? "due" : "force", mode: opts.due ? "due" : "force",
}); });
printCronJson(res); printCronJson(res);
const result = res as { ok?: boolean; ran?: boolean } | undefined; const result = res as { ok?: boolean; ran?: boolean; enqueued?: boolean } | undefined;
defaultRuntime.exit(result?.ok && result?.ran ? 0 : 1); defaultRuntime.exit(result?.ok && (result?.ran || result?.enqueued) ? 0 : 1);
} catch (err) { } catch (err) {
handleCronCliError(err); handleCronCliError(err);
} }

View File

@@ -208,6 +208,29 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
}); });
it("consolidates descendant output into the cron announce path", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true);
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(
"Detailed child result, everything finished successfully.",
);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
const params = makeBaseParams({ synthesizedText: "on it" });
const state = await dispatchCronDelivery(params);
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
expect(runSubagentAnnounceFlow).toHaveBeenCalledWith(
expect.objectContaining({
roundOneReply: "Detailed child result, everything finished successfully.",
expectsCompletionMessage: true,
announceType: "cron job",
}),
);
});
it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => { it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);

View File

@@ -1,6 +1,8 @@
import fs from "node:fs/promises"; import fs from "node:fs/promises";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
import { clearCommandLane, setCommandLaneConcurrency } from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
import * as schedule from "./schedule.js"; import * as schedule from "./schedule.js";
import { import {
createAbortAwareIsolatedRunner, createAbortAwareIsolatedRunner,
@@ -15,9 +17,13 @@ import {
writeCronStoreSnapshot, writeCronStoreSnapshot,
} from "./service.issue-regressions.test-helpers.js"; } from "./service.issue-regressions.test-helpers.js";
import { CronService } from "./service.js"; import { CronService } from "./service.js";
import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js"; import {
createDeferred,
createNoopLogger,
createRunningCronServiceState,
} from "./service.test-harness.js";
import { computeJobNextRunAtMs } from "./service/jobs.js"; import { computeJobNextRunAtMs } from "./service/jobs.js";
import { run } from "./service/ops.js"; import { enqueueRun, run } from "./service/ops.js";
import { createCronServiceState, type CronEvent } from "./service/state.js"; import { createCronServiceState, type CronEvent } from "./service/state.js";
import { import {
DEFAULT_JOB_TIMEOUT_MS, DEFAULT_JOB_TIMEOUT_MS,
@@ -1486,6 +1492,110 @@ describe("Cron issue regressions", () => {
expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok"); expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok");
}); });
it("queues manual cron.run requests behind the cron execution lane", async () => {
vi.useRealTimers();
clearCommandLane(CommandLane.Cron);
setCommandLaneConcurrency(CommandLane.Cron, 1);
const store = makeStorePath();
const dueAt = Date.parse("2026-02-06T10:05:02.000Z");
const first = createDueIsolatedJob({ id: "queued-first", nowMs: dueAt, nextRunAtMs: dueAt });
const second = createDueIsolatedJob({
id: "queued-second",
nowMs: dueAt,
nextRunAtMs: dueAt,
});
await fs.writeFile(
store.storePath,
JSON.stringify({ version: 1, jobs: [first, second] }),
"utf-8",
);
let now = dueAt;
let activeRuns = 0;
let peakActiveRuns = 0;
const firstRun = createDeferred<{ status: "ok"; summary: string }>();
const secondRun = createDeferred<{ status: "ok"; summary: string }>();
const secondStarted = createDeferred<void>();
const runIsolatedAgentJob = vi.fn(async (params: { job: { id: string } }) => {
activeRuns += 1;
peakActiveRuns = Math.max(peakActiveRuns, activeRuns);
if (params.job.id === second.id) {
secondStarted.resolve();
}
try {
const result =
params.job.id === first.id ? await firstRun.promise : await secondRun.promise;
now += 10;
return result;
} finally {
activeRuns -= 1;
}
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
cronConfig: { maxConcurrentRuns: 1 },
log: createNoopLogger(),
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
});
const firstAck = await enqueueRun(state, first.id, "force");
const secondAck = await enqueueRun(state, second.id, "force");
expect(firstAck).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
expect(secondAck).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await vi.waitFor(() => expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1));
expect(runIsolatedAgentJob.mock.calls[0]?.[0]).toMatchObject({ job: { id: first.id } });
expect(peakActiveRuns).toBe(1);
firstRun.resolve({ status: "ok", summary: "first queued run" });
await secondStarted.promise;
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2);
expect(runIsolatedAgentJob.mock.calls[1]?.[0]).toMatchObject({ job: { id: second.id } });
expect(peakActiveRuns).toBe(1);
secondRun.resolve({ status: "ok", summary: "second queued run" });
await vi.waitFor(() => {
const jobs = state.store?.jobs ?? [];
expect(jobs.find((job) => job.id === first.id)?.state.lastStatus).toBe("ok");
expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok");
});
clearCommandLane(CommandLane.Cron);
});
it("logs unexpected queued manual run background failures once", async () => {
vi.useRealTimers();
clearCommandLane(CommandLane.Cron);
setCommandLaneConcurrency(CommandLane.Cron, 1);
const dueAt = Date.parse("2026-02-06T10:05:03.000Z");
const job = createDueIsolatedJob({ id: "queued-failure", nowMs: dueAt, nextRunAtMs: dueAt });
const log = createNoopLogger();
const badStore = `${makeStorePath().storePath}.dir`;
await fs.mkdir(badStore, { recursive: true });
const state = createRunningCronServiceState({
storePath: badStore,
log,
nowMs: () => dueAt,
jobs: [job],
});
const result = await enqueueRun(state, job.id, "force");
expect(result).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await vi.waitFor(() => expect(log.error).toHaveBeenCalledTimes(1));
expect(log.error.mock.calls[0]?.[1]).toBe(
"cron: queued manual run background execution failed",
);
clearCommandLane(CommandLane.Cron);
});
// Regression: isolated cron runs must not abort at 1/3 of configured timeoutSeconds. // Regression: isolated cron runs must not abort at 1/3 of configured timeoutSeconds.
// The bug (issue #29774) caused the CLI-provider resume watchdog (ratio 0.3, maxMs 180 s) // The bug (issue #29774) caused the CLI-provider resume watchdog (ratio 0.3, maxMs 180 s)
// to be applied on fresh sessions because a persisted cliSessionId was passed to // to be applied on fresh sessions because a persisted cliSessionId was passed to

View File

@@ -46,6 +46,10 @@ export class CronService {
return await ops.run(this.state, id, mode); return await ops.run(this.state, id, mode);
} }
async enqueueRun(id: string, mode?: "due" | "force") {
return await ops.enqueueRun(this.state, id, mode);
}
getJob(id: string): CronJob | undefined { getJob(id: string): CronJob | undefined {
return this.state.store?.jobs.find((job) => job.id === id); return this.state.store?.jobs.find((job) => job.id === id);
} }

View File

@@ -1,3 +1,5 @@
import { enqueueCommandInLane } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js"; import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js";
import { import {
@@ -339,8 +341,58 @@ export async function remove(state: CronServiceState, id: string) {
}); });
} }
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { type PreparedManualRun =
const prepared = await locked(state, async () => { | {
ok: true;
ran: false;
reason: "already-running" | "not-due";
}
| {
ok: true;
ran: true;
jobId: string;
startedAt: number;
executionJob: CronJob;
}
| { ok: false };
type ManualRunDisposition =
| Extract<PreparedManualRun, { ran: false }>
| { ok: true; runnable: true };
let nextManualRunId = 1;
async function inspectManualRunDisposition(
state: CronServiceState,
id: string,
mode?: "due" | "force",
): Promise<ManualRunDisposition | { ok: false }> {
return await locked(state, async () => {
warnIfDisabled(state, "run");
await ensureLoaded(state, { skipRecompute: true });
// Normalize job tick state (clears stale runningAtMs markers) before
// checking if already running, so a stale marker from a crashed Phase-1
// persist does not block manual triggers for up to STUCK_RUN_MS (#17554).
recomputeNextRunsForMaintenance(state);
const job = findJobOrThrow(state, id);
if (typeof job.state.runningAtMs === "number") {
return { ok: true, ran: false, reason: "already-running" as const };
}
const now = state.deps.nowMs();
const due = isJobDue(job, now, { forced: mode === "force" });
if (!due) {
return { ok: true, ran: false, reason: "not-due" as const };
}
return { ok: true, runnable: true } as const;
});
}
async function prepareManualRun(
state: CronServiceState,
id: string,
mode?: "due" | "force",
): Promise<PreparedManualRun> {
return await locked(state, async () => {
warnIfDisabled(state, "run"); warnIfDisabled(state, "run");
await ensureLoaded(state, { skipRecompute: true }); await ensureLoaded(state, { skipRecompute: true });
// Normalize job tick state (clears stale runningAtMs markers) before // Normalize job tick state (clears stale runningAtMs markers) before
@@ -365,7 +417,7 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
// force-reload from disk cannot start the same job concurrently. // force-reload from disk cannot start the same job concurrently.
await persist(state); await persist(state);
emit(state, { jobId: job.id, action: "started", runAtMs: now }); emit(state, { jobId: job.id, action: "started", runAtMs: now });
const executionJob = JSON.parse(JSON.stringify(job)) as typeof job; const executionJob = JSON.parse(JSON.stringify(job)) as CronJob;
return { return {
ok: true, ok: true,
ran: true, ran: true,
@@ -374,13 +426,13 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
executionJob, executionJob,
} as const; } as const;
}); });
}
if (!prepared.ran) { async function finishPreparedManualRun(
return prepared; state: CronServiceState,
} prepared: Extract<PreparedManualRun, { ran: true }>,
if (!prepared.executionJob || typeof prepared.startedAt !== "number") { mode?: "due" | "force",
return { ok: false } as const; ): Promise<void> {
}
const executionJob = prepared.executionJob; const executionJob = prepared.executionJob;
const startedAt = prepared.startedAt; const startedAt = prepared.startedAt;
const jobId = prepared.jobId; const jobId = prepared.jobId;
@@ -461,10 +513,54 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
await persist(state); await persist(state);
armTimer(state); armTimer(state);
}); });
}
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
const prepared = await prepareManualRun(state, id, mode);
if (!prepared.ok || !prepared.ran) {
return prepared;
}
await finishPreparedManualRun(state, prepared, mode);
return { ok: true, ran: true } as const; return { ok: true, ran: true } as const;
} }
export async function enqueueRun(state: CronServiceState, id: string, mode?: "due" | "force") {
const disposition = await inspectManualRunDisposition(state, id, mode);
if (!disposition.ok || !("runnable" in disposition && disposition.runnable)) {
return disposition;
}
const runId = `manual:${id}:${state.deps.nowMs()}:${nextManualRunId++}`;
void enqueueCommandInLane(
CommandLane.Cron,
async () => {
const result = await run(state, id, mode);
if (result.ok && "ran" in result && !result.ran) {
state.deps.log.info(
{ jobId: id, runId, reason: result.reason },
"cron: queued manual run skipped before execution",
);
}
return result;
},
{
warnAfterMs: 5_000,
onWait: (waitMs, queuedAhead) => {
state.deps.log.warn(
{ jobId: id, runId, waitMs, queuedAhead },
"cron: queued manual run waiting for an execution slot",
);
},
},
).catch((err) => {
state.deps.log.error(
{ jobId: id, runId, err: String(err) },
"cron: queued manual run background execution failed",
);
});
return { ok: true, enqueued: true, runId } as const;
}
export function wakeNow( export function wakeNow(
state: CronServiceState, state: CronServiceState,
opts: { mode: "now" | "next-heartbeat"; text: string }, opts: { mode: "now" | "next-heartbeat"; text: string },

View File

@@ -142,6 +142,7 @@ export type CronStatusSummary = {
export type CronRunResult = export type CronRunResult =
| { ok: true; ran: true } | { ok: true; ran: true }
| { ok: true; enqueued: true; runId: string }
| { ok: true; ran: false; reason: "not-due" } | { ok: true; ran: false; reason: "not-due" }
| { ok: true; ran: false; reason: "already-running" } | { ok: true; ran: false; reason: "already-running" }
| { ok: false }; | { ok: false };

View File

@@ -212,7 +212,7 @@ export const cronHandlers: GatewayRequestHandlers = {
); );
return; return;
} }
const result = await context.cron.run(jobId, p.mode ?? "force"); const result = await context.cron.enqueueRun(jobId, p.mode ?? "force");
respond(true, result, undefined); respond(true, result, undefined);
}, },
"cron.runs": async ({ params, respond, context }) => { "cron.runs": async ({ params, respond, context }) => {

View File

@@ -9,6 +9,7 @@ import {
connectOk, connectOk,
cronIsolatedRun, cronIsolatedRun,
installGatewayTestHooks, installGatewayTestHooks,
onceMessage,
rpcReq, rpcReq,
startServerWithClient, startServerWithClient,
testState, testState,
@@ -35,7 +36,6 @@ vi.mock("../infra/net/fetch-guard.js", () => ({
})); }));
installGatewayTestHooks({ scope: "suite" }); installGatewayTestHooks({ scope: "suite" });
const CRON_WAIT_INTERVAL_MS = 5;
const CRON_WAIT_TIMEOUT_MS = 3_000; const CRON_WAIT_TIMEOUT_MS = 3_000;
const EMPTY_CRON_STORE_CONTENT = JSON.stringify({ version: 1, jobs: [] }); const EMPTY_CRON_STORE_CONTENT = JSON.stringify({ version: 1, jobs: [] });
let cronSuiteTempRootPromise: Promise<string> | null = null; let cronSuiteTempRootPromise: Promise<string> | null = null;
@@ -69,16 +69,20 @@ async function rmTempDir(dir: string) {
await fs.rm(dir, { recursive: true, force: true }); await fs.rm(dir, { recursive: true, force: true });
} }
async function waitForCondition(check: () => boolean | Promise<boolean>, timeoutMs = 2000) { async function waitForCronEvent(
await vi.waitFor( ws: WebSocket,
async () => { check: (payload: Record<string, unknown> | null) => boolean,
const ok = await check(); timeoutMs = CRON_WAIT_TIMEOUT_MS,
if (!ok) { ) {
throw new Error("condition not met"); const message = await onceMessage(
} ws,
(obj) => {
const payload = obj.payload ?? null;
return obj.type === "event" && obj.event === "cron" && check(payload);
}, },
{ timeout: timeoutMs, interval: CRON_WAIT_INTERVAL_MS }, timeoutMs,
); );
return message.payload ?? null;
} }
async function createCronCasePaths(tempPrefix: string): Promise<{ async function createCronCasePaths(tempPrefix: string): Promise<{
@@ -178,6 +182,8 @@ async function addWebhookCronJob(params: {
async function runCronJobForce(ws: WebSocket, id: string) { async function runCronJobForce(ws: WebSocket, id: string) {
const response = await rpcReq(ws, "cron.run", { id, mode: "force" }, 20_000); const response = await rpcReq(ws, "cron.run", { id, mode: "force" }, 20_000);
expect(response.ok).toBe(true); expect(response.ok).toBe(true);
expect(response.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
return response;
} }
function getWebhookCall(index: number) { function getWebhookCall(index: number) {
@@ -263,6 +269,7 @@ describe("gateway server cron", () => {
const runRes = await rpcReq(ws, "cron.run", { id: routeJobId, mode: "force" }, 20_000); const runRes = await rpcReq(ws, "cron.run", { id: routeJobId, mode: "force" }, 20_000);
expect(runRes.ok).toBe(true); expect(runRes.ok).toBe(true);
expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
const events = await waitForSystemEvent(); const events = await waitForSystemEvent();
expect(events.some((event) => event.includes("cron route check"))).toBe(true); expect(events.some((event) => event.includes("cron route check"))).toBe(true);
@@ -441,7 +448,7 @@ describe("gateway server cron", () => {
}); });
test("writes cron run history and auto-runs due jobs", async () => { test("writes cron run history and auto-runs due jobs", async () => {
const { prevSkipCron, dir } = await setupCronTestRun({ const { prevSkipCron } = await setupCronTestRun({
tempPrefix: "openclaw-gw-cron-log-", tempPrefix: "openclaw-gw-cron-log-",
}); });
@@ -463,31 +470,21 @@ describe("gateway server cron", () => {
const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
expect(jobId.length > 0).toBe(true); expect(jobId.length > 0).toBe(true);
const finishedRun = waitForCronEvent(
ws,
(payload) => payload?.jobId === jobId && payload?.action === "finished",
);
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000); const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
expect(runRes.ok).toBe(true); expect(runRes.ok).toBe(true);
const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`); expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
let raw = ""; const finishedPayload = await finishedRun;
await waitForCondition(async () => { expect(finishedPayload).toMatchObject({
raw = await fs.readFile(logPath, "utf-8").catch(() => ""); jobId,
return raw.trim().length > 0; action: "finished",
}, CRON_WAIT_TIMEOUT_MS); status: "ok",
const line = raw summary: "hello",
.split("\n") deliveryStatus: "not-requested",
.map((l) => l.trim()) });
.filter(Boolean)
.at(-1);
const last = JSON.parse(line ?? "{}") as {
jobId?: unknown;
action?: unknown;
status?: unknown;
summary?: unknown;
deliveryStatus?: unknown;
};
expect(last.action).toBe("finished");
expect(last.jobId).toBe(jobId);
expect(last.status).toBe("ok");
expect(last.summary).toBe("hello");
expect(last.deliveryStatus).toBe("not-requested");
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 }); const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
expect(runsRes.ok).toBe(true); expect(runsRes.ok).toBe(true);
@@ -522,7 +519,7 @@ describe("gateway server cron", () => {
const autoRes = await rpcReq(ws, "cron.add", { const autoRes = await rpcReq(ws, "cron.add", {
name: "auto run test", name: "auto run test",
enabled: true, enabled: true,
schedule: { kind: "at", at: new Date(Date.now() + 50).toISOString() }, schedule: { kind: "at", at: new Date(Date.now() + 200).toISOString() },
sessionTarget: "main", sessionTarget: "main",
wakeMode: "next-heartbeat", wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "auto" }, payload: { kind: "systemEvent", text: "auto" },
@@ -532,11 +529,10 @@ describe("gateway server cron", () => {
const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : ""; const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : "";
expect(autoJobId.length > 0).toBe(true); expect(autoJobId.length > 0).toBe(true);
await waitForCondition(async () => { await waitForCronEvent(
const runsRes = await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 }); ws,
const runsPayload = runsRes.payload as { entries?: unknown } | undefined; (payload) => payload?.jobId === autoJobId && payload?.action === "finished",
return Array.isArray(runsPayload?.entries) && runsPayload.entries.length > 0; );
}, CRON_WAIT_TIMEOUT_MS);
const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as
| { entries?: Array<{ jobId?: unknown }> } | { entries?: Array<{ jobId?: unknown }> }
| undefined; | undefined;
@@ -548,6 +544,162 @@ describe("gateway server cron", () => {
} }
}, 45_000); }, 45_000);
test("returns from cron.run immediately while isolated work continues in background", async () => {
const { prevSkipCron } = await setupCronTestRun({
tempPrefix: "openclaw-gw-cron-run-detached-",
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
let resolveRun: ((value: { status: "ok"; summary: string }) => void) | undefined;
cronIsolatedRun.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveRun = resolve as (value: { status: "ok"; summary: string }) => void;
}),
);
try {
const addRes = await rpcReq(ws, "cron.add", {
name: "detached run test",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "do work" },
delivery: { mode: "none" },
});
expect(addRes.ok).toBe(true);
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
expect(jobId.length > 0).toBe(true);
const startedRun = waitForCronEvent(
ws,
(payload) => payload?.jobId === jobId && payload?.action === "started",
);
const finishedRun = waitForCronEvent(
ws,
(payload) => payload?.jobId === jobId && payload?.action === "finished",
);
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 1_000);
expect(runRes.ok).toBe(true);
expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await startedRun;
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
resolveRun?.({ status: "ok", summary: "background finished" });
const finishedPayload = await finishedRun;
expect(finishedPayload).toMatchObject({
jobId,
action: "finished",
status: "ok",
summary: "background finished",
});
} finally {
await cleanupCronTestRun({ ws, server, prevSkipCron });
}
});
test("returns already-running without starting background work", async () => {
const now = Date.now();
let resolveRun: ((result: { status: "ok"; summary: string }) => void) | undefined;
cronIsolatedRun.mockImplementationOnce(
() =>
new Promise((resolve) => {
resolveRun = resolve;
}),
);
const { prevSkipCron } = await setupCronTestRun({
tempPrefix: "openclaw-gw-cron-run-busy-",
jobs: [
{
id: "busy-job",
name: "busy job",
enabled: true,
createdAtMs: now - 60_000,
updatedAtMs: now - 60_000,
schedule: { kind: "at", at: new Date(now + 60_000).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "still busy" },
delivery: { mode: "none" },
state: {
nextRunAtMs: now + 60_000,
},
},
],
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
try {
const startedRun = waitForCronEvent(
ws,
(payload) => payload?.jobId === "busy-job" && payload?.action === "started",
);
const firstRunRes = await rpcReq(ws, "cron.run", { id: "busy-job", mode: "force" }, 1_000);
expect(firstRunRes.ok).toBe(true);
expect(firstRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await startedRun;
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
const secondRunRes = await rpcReq(ws, "cron.run", { id: "busy-job", mode: "force" }, 1_000);
expect(secondRunRes.ok).toBe(true);
expect(secondRunRes.payload).toEqual({ ok: true, ran: false, reason: "already-running" });
expect(cronIsolatedRun).toHaveBeenCalledTimes(1);
const finishedRun = waitForCronEvent(
ws,
(payload) => payload?.jobId === "busy-job" && payload?.action === "finished",
);
resolveRun?.({ status: "ok", summary: "busy done" });
await finishedRun;
} finally {
await cleanupCronTestRun({ ws, server, prevSkipCron });
}
});
test("returns not-due without starting background work", async () => {
const now = Date.now();
const { prevSkipCron } = await setupCronTestRun({
tempPrefix: "openclaw-gw-cron-run-not-due-",
jobs: [
{
id: "future-job",
name: "future job",
enabled: true,
createdAtMs: now - 60_000,
updatedAtMs: now - 60_000,
schedule: { kind: "at", at: new Date(now + 60_000).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "not yet" },
delivery: { mode: "none" },
state: {
nextRunAtMs: now + 60_000,
},
},
],
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
cronIsolatedRun.mockClear();
try {
const runRes = await rpcReq(ws, "cron.run", { id: "future-job", mode: "due" }, 1_000);
expect(runRes.ok).toBe(true);
expect(runRes.payload).toEqual({ ok: true, ran: false, reason: "not-due" });
expect(cronIsolatedRun).not.toHaveBeenCalled();
} finally {
await cleanupCronTestRun({ ws, server, prevSkipCron });
}
});
test("posts webhooks for delivery mode and legacy notify fallback only when summary exists", async () => { test("posts webhooks for delivery mode and legacy notify fallback only when summary exists", async () => {
const legacyNotifyJob = { const legacyNotifyJob = {
id: "legacy-notify-job", id: "legacy-notify-job",
@@ -608,12 +760,12 @@ describe("gateway server cron", () => {
name: "webhook enabled", name: "webhook enabled",
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" }, delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
}); });
await runCronJobForce(ws, notifyJobId); const notifyFinished = waitForCronEvent(
ws,
await waitForCondition( (payload) => payload?.jobId === notifyJobId && payload?.action === "finished",
() => fetchWithSsrFGuardMock.mock.calls.length === 1,
CRON_WAIT_TIMEOUT_MS,
); );
await runCronJobForce(ws, notifyJobId);
await notifyFinished;
const notifyCall = getWebhookCall(0); const notifyCall = getWebhookCall(0);
expect(notifyCall.url).toBe("https://example.invalid/cron-finished"); expect(notifyCall.url).toBe("https://example.invalid/cron-finished");
expect(notifyCall.init.method).toBe("POST"); expect(notifyCall.init.method).toBe("POST");
@@ -623,6 +775,10 @@ describe("gateway server cron", () => {
expect(notifyBody.action).toBe("finished"); expect(notifyBody.action).toBe("finished");
expect(notifyBody.jobId).toBe(notifyJobId); expect(notifyBody.jobId).toBe(notifyJobId);
const legacyFinished = waitForCronEvent(
ws,
(payload) => payload?.jobId === "legacy-notify-job" && payload?.action === "finished",
);
const legacyRunRes = await rpcReq( const legacyRunRes = await rpcReq(
ws, ws,
"cron.run", "cron.run",
@@ -630,10 +786,8 @@ describe("gateway server cron", () => {
20_000, 20_000,
); );
expect(legacyRunRes.ok).toBe(true); expect(legacyRunRes.ok).toBe(true);
await waitForCondition( expect(legacyRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
() => fetchWithSsrFGuardMock.mock.calls.length === 2, await legacyFinished;
CRON_WAIT_TIMEOUT_MS,
);
const legacyCall = getWebhookCall(1); const legacyCall = getWebhookCall(1);
expect(legacyCall.url).toBe("https://legacy.example.invalid/cron-finished"); expect(legacyCall.url).toBe("https://legacy.example.invalid/cron-finished");
expect(legacyCall.init.method).toBe("POST"); expect(legacyCall.init.method).toBe("POST");
@@ -655,10 +809,14 @@ describe("gateway server cron", () => {
const silentJobId = typeof silentJobIdValue === "string" ? silentJobIdValue : ""; const silentJobId = typeof silentJobIdValue === "string" ? silentJobIdValue : "";
expect(silentJobId.length > 0).toBe(true); expect(silentJobId.length > 0).toBe(true);
const silentFinished = waitForCronEvent(
ws,
(payload) => payload?.jobId === silentJobId && payload?.action === "finished",
);
const silentRunRes = await rpcReq(ws, "cron.run", { id: silentJobId, mode: "force" }, 20_000); const silentRunRes = await rpcReq(ws, "cron.run", { id: silentJobId, mode: "force" }, 20_000);
expect(silentRunRes.ok).toBe(true); expect(silentRunRes.ok).toBe(true);
await yieldToEventLoop(); expect(silentRunRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await yieldToEventLoop(); await silentFinished;
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2); expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2);
fetchWithSsrFGuardMock.mockClear(); fetchWithSsrFGuardMock.mockClear();
@@ -677,11 +835,12 @@ describe("gateway server cron", () => {
}, },
}, },
}); });
await runCronJobForce(ws, failureDestJobId); const failureDestFinished = waitForCronEvent(
await waitForCondition( ws,
() => fetchWithSsrFGuardMock.mock.calls.length === 1, (payload) => payload?.jobId === failureDestJobId && payload?.action === "finished",
CRON_WAIT_TIMEOUT_MS,
); );
await runCronJobForce(ws, failureDestJobId);
await failureDestFinished;
const failureDestCall = getWebhookCall(0); const failureDestCall = getWebhookCall(0);
expect(failureDestCall.url).toBe("https://example.invalid/failure-destination"); expect(failureDestCall.url).toBe("https://example.invalid/failure-destination");
const failureDestBody = failureDestCall.body; const failureDestBody = failureDestCall.body;
@@ -696,9 +855,12 @@ describe("gateway server cron", () => {
sessionTarget: "isolated", sessionTarget: "isolated",
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" }, delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
}); });
const noSummaryFinished = waitForCronEvent(
ws,
(payload) => payload?.jobId === noSummaryJobId && payload?.action === "finished",
);
await runCronJobForce(ws, noSummaryJobId); await runCronJobForce(ws, noSummaryJobId);
await yieldToEventLoop(); await noSummaryFinished;
await yieldToEventLoop();
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1); expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
} finally { } finally {
await cleanupCronTestRun({ ws, server, prevSkipCron }); await cleanupCronTestRun({ ws, server, prevSkipCron });
@@ -741,12 +903,12 @@ describe("gateway server cron", () => {
name: "webhook secretinput object", name: "webhook secretinput object",
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" }, delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
}); });
await runCronJobForce(ws, notifyJobId); const notifyFinished = waitForCronEvent(
ws,
await waitForCondition( (payload) => payload?.jobId === notifyJobId && payload?.action === "finished",
() => fetchWithSsrFGuardMock.mock.calls.length === 1,
CRON_WAIT_TIMEOUT_MS,
); );
await runCronJobForce(ws, notifyJobId);
await notifyFinished;
const [notifyArgs] = fetchWithSsrFGuardMock.mock.calls[0] as unknown as [ const [notifyArgs] = fetchWithSsrFGuardMock.mock.calls[0] as unknown as [
{ {
url?: string; url?: string;