test: split cron service regression ownership

This commit is contained in:
Shakker
2026-04-02 00:59:17 +01:00
committed by Peter Steinberger
parent deb70e7e25
commit 33248980d9
5 changed files with 1727 additions and 1692 deletions

View File

@@ -1,151 +1,27 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, vi } from "vitest";
import { clearAllBootstrapSnapshots } from "../agents/bootstrap-cache.js";
import { clearSessionStoreCacheForTest } from "../config/sessions/store.js";
import { resetAgentRunContextForTest } from "../infra/agent-events.js";
import { useFrozenTime, useRealTime } from "../test-utils/frozen-time.js";
import { vi } from "vitest";
import { CronService } from "./service.js";
import type { CronJob, CronJobState } from "./types.js";
const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000;
export const noopLogger = {
info: () => {},
warn: () => {},
error: () => {},
debug: () => {},
trace: () => {},
};
let fixtureRoot = "";
let fixtureCount = 0;
import {
createDefaultIsolatedRunner,
noopLogger,
setupCronRegressionFixtures,
} from "./service.regression-fixtures.js";
export type CronServiceOptions = ConstructorParameters<typeof CronService>[0];
export function setupCronIssueRegressionFixtures() {
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-"));
});
export const setupCronIssueRegressionFixtures = () =>
setupCronRegressionFixtures({ prefix: "cron-issues-" });
beforeEach(() => {
useFrozenTime("2026-02-06T10:05:00.000Z");
});
afterEach(() => {
vi.clearAllTimers();
vi.restoreAllMocks();
useRealTime();
clearSessionStoreCacheForTest();
resetAgentRunContextForTest();
clearAllBootstrapSnapshots();
});
afterAll(async () => {
useRealTime();
await fs.rm(fixtureRoot, { recursive: true, force: true });
});
return {
makeStorePath,
};
}
export function topOfHourOffsetMs(jobId: string) {
const digest = crypto.createHash("sha256").update(jobId).digest();
return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS;
}
export function makeStorePath() {
const storePath = path.join(fixtureRoot, `case-${fixtureCount++}.jobs.json`);
return {
storePath,
};
}
export function createDueIsolatedJob(params: {
id: string;
nowMs: number;
nextRunAtMs: number;
deleteAfterRun?: boolean;
}): CronJob {
return {
id: params.id,
name: params.id,
enabled: true,
deleteAfterRun: params.deleteAfterRun ?? false,
createdAtMs: params.nowMs,
updatedAtMs: params.nowMs,
schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: params.id },
delivery: { mode: "none" },
state: { nextRunAtMs: params.nextRunAtMs },
};
}
export function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"] {
return vi.fn().mockResolvedValue({
status: "ok",
summary: "ok",
}) as CronServiceOptions["runIsolatedAgentJob"];
}
export function createAbortAwareIsolatedRunner(summary = "late") {
let observedAbortSignal: AbortSignal | undefined;
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return { status: "ok" as const, summary };
}) as CronServiceOptions["runIsolatedAgentJob"];
return {
runIsolatedAgentJob,
getObservedAbortSignal: () => observedAbortSignal,
};
}
export function createIsolatedRegressionJob(params: {
id: string;
name: string;
scheduledAt: number;
schedule: CronJob["schedule"];
payload: CronJob["payload"];
state?: CronJobState;
}): CronJob {
return {
id: params.id,
name: params.name,
enabled: true,
createdAtMs: params.scheduledAt - 86_400_000,
updatedAtMs: params.scheduledAt - 86_400_000,
schedule: params.schedule,
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: params.payload,
delivery: { mode: "announce" },
state: params.state ?? {},
};
}
export async function writeCronJobs(storePath: string, jobs: CronJob[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
export async function writeCronStoreSnapshot(storePath: string, jobs: unknown[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
export {
createAbortAwareIsolatedRunner,
createDueIsolatedJob,
createIsolatedRegressionJob,
createRunningCronServiceState,
createDeferred,
noopLogger,
topOfHourOffsetMs,
writeCronJobs,
writeCronStoreSnapshot,
} from "./service.regression-fixtures.js";
export async function startCronForStore(params: {
storePath: string;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,177 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, vi } from "vitest";
import { clearAllBootstrapSnapshots } from "../agents/bootstrap-cache.js";
import { clearSessionStoreCacheForTest } from "../config/sessions/store.js";
import { resetAgentRunContextForTest } from "../infra/agent-events.js";
import { useFrozenTime, useRealTime } from "../test-utils/frozen-time.js";
import { createCronServiceState, type CronServiceDeps } from "./service/state.js";
import type { CronJob, CronJobState } from "./types.js";
const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000;
export const noopLogger = {
info: () => {},
warn: () => {},
error: () => {},
debug: () => {},
trace: () => {},
};
export function setupCronRegressionFixtures(options?: { prefix?: string; baseTimeIso?: string }) {
let fixtureRoot = "";
let fixtureCount = 0;
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), options?.prefix ?? "cron-issues-"));
});
beforeEach(() => {
useFrozenTime(options?.baseTimeIso ?? "2026-02-06T10:05:00.000Z");
});
afterEach(() => {
vi.clearAllTimers();
vi.restoreAllMocks();
useRealTime();
clearSessionStoreCacheForTest();
resetAgentRunContextForTest();
clearAllBootstrapSnapshots();
});
afterAll(async () => {
useRealTime();
await fs.rm(fixtureRoot, { recursive: true, force: true });
});
return {
makeStorePath() {
return {
storePath: path.join(fixtureRoot, `case-${fixtureCount++}.jobs.json`),
};
},
};
}
export function createDeferred<T>() {
let resolve!: (value: T) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
export function createRunningCronServiceState(params: {
storePath: string;
log: CronServiceDeps["log"];
nowMs: () => number;
jobs: CronJob[];
}) {
const state = createCronServiceState({
cronEnabled: true,
storePath: params.storePath,
log: params.log,
nowMs: params.nowMs,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }),
});
state.running = true;
state.store = {
version: 1,
jobs: params.jobs,
};
return state;
}
export function topOfHourOffsetMs(jobId: string) {
const digest = crypto.createHash("sha256").update(jobId).digest();
return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS;
}
export function createDueIsolatedJob(params: {
id: string;
nowMs: number;
nextRunAtMs: number;
deleteAfterRun?: boolean;
}): CronJob {
return {
id: params.id,
name: params.id,
enabled: true,
deleteAfterRun: params.deleteAfterRun ?? false,
createdAtMs: params.nowMs,
updatedAtMs: params.nowMs,
schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: params.id },
delivery: { mode: "none" },
state: { nextRunAtMs: params.nextRunAtMs },
};
}
export function createDefaultIsolatedRunner(): CronServiceDeps["runIsolatedAgentJob"] {
return vi.fn().mockResolvedValue({
status: "ok",
summary: "ok",
}) as CronServiceDeps["runIsolatedAgentJob"];
}
export function createAbortAwareIsolatedRunner(summary = "late") {
let observedAbortSignal: AbortSignal | undefined;
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return { status: "ok" as const, summary };
}) as CronServiceDeps["runIsolatedAgentJob"];
return {
runIsolatedAgentJob,
getObservedAbortSignal: () => observedAbortSignal,
};
}
export function createIsolatedRegressionJob(params: {
id: string;
name: string;
scheduledAt: number;
schedule: CronJob["schedule"];
payload: CronJob["payload"];
state?: CronJobState;
}): CronJob {
return {
id: params.id,
name: params.name,
enabled: true,
createdAtMs: params.scheduledAt - 86_400_000,
updatedAtMs: params.scheduledAt - 86_400_000,
schedule: params.schedule,
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: params.payload,
delivery: { mode: "announce" },
state: params.state ?? {},
};
}
export async function writeCronJobs(storePath: string, jobs: CronJob[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
export async function writeCronStoreSnapshot(storePath: string, jobs: unknown[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}

View File

@@ -0,0 +1,393 @@
import fs from "node:fs/promises";
import { describe, expect, it, vi } from "vitest";
import { clearCommandLane, setCommandLaneConcurrency } from "../../process/command-queue.js";
import { CommandLane } from "../../process/lanes.js";
import {
createAbortAwareIsolatedRunner,
createDeferred,
createDueIsolatedJob,
createIsolatedRegressionJob,
createRunningCronServiceState,
noopLogger,
setupCronRegressionFixtures,
writeCronJobs,
} from "../service.regression-fixtures.js";
import { enqueueRun, run } from "./ops.js";
import type { CronEvent } from "./state.js";
import { createCronServiceState } from "./state.js";
import { onTimer } from "./timer.js";
const FAST_TIMEOUT_SECONDS = 0.0025;
const opsRegressionFixtures = setupCronRegressionFixtures({
prefix: "cron-service-ops-regressions-",
});
describe("cron service ops regressions", () => {
it("skips forced manual runs while a timer-triggered run is in progress", async () => {
const store = opsRegressionFixtures.makeStorePath();
const dueAt = Date.now() - 1;
const job = createIsolatedRegressionJob({
id: "timer-overlap",
name: "timer-overlap",
scheduledAt: dueAt,
schedule: { kind: "at", at: new Date(dueAt).toISOString() },
payload: { kind: "agentTurn", message: "long task" },
state: { nextRunAtMs: dueAt },
});
await writeCronJobs(store.storePath, [job]);
let resolveRun:
| ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void)
| undefined;
const started = createDeferred<void>();
const finished = createDeferred<void>();
const runIsolatedAgentJob = vi.fn(
async () =>
await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string; error?: string }>(
(resolve) => {
resolveRun = resolve;
},
),
);
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
onEvent: (evt: CronEvent) => {
if (evt.jobId !== job.id) {
return;
}
if (evt.action === "started") {
started.resolve();
} else if (evt.action === "finished" && evt.status === "ok") {
finished.resolve();
}
},
});
const timerPromise = onTimer(state);
await started.promise;
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
const manualResult = await run(state, job.id, "force");
expect(manualResult).toEqual({ ok: true, ran: false, reason: "already-running" });
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
resolveRun?.({ status: "ok", summary: "done" });
await finished.promise;
await timerPromise;
});
it("does not double-run a job when cron.run overlaps a due timer tick", async () => {
const store = opsRegressionFixtures.makeStorePath();
const now = Date.parse("2026-02-06T10:05:00.000Z");
const job = createIsolatedRegressionJob({
id: "manual-overlap-no-double-run",
name: "manual overlap no double run",
scheduledAt: now,
schedule: { kind: "at", at: new Date(now).toISOString() },
payload: { kind: "agentTurn", message: "overlap" },
state: { nextRunAtMs: now },
});
await writeCronJobs(store.storePath, [job]);
const runStarted = createDeferred<void>();
const runFinished = createDeferred<void>();
const runResolvers: Array<
(value: { status: "ok" | "error" | "skipped"; summary?: string }) => void
> = [];
const runIsolatedAgentJob = vi.fn(async () => {
if (runIsolatedAgentJob.mock.calls.length === 1) {
runStarted.resolve();
}
return await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string }>(
(resolve) => {
runResolvers.push(resolve);
},
);
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
onEvent: (evt: CronEvent) => {
if (evt.jobId === job.id && evt.action === "finished") {
runFinished.resolve();
}
},
});
const manualRun = run(state, job.id, "force");
await runStarted.promise;
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
await onTimer(state);
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
runResolvers[0]?.({ status: "ok", summary: "done" });
await manualRun;
await runFinished.promise;
});
it("manual cron.run preserves unrelated due jobs but advances already-executed stale slots", async () => {
const store = opsRegressionFixtures.makeStorePath();
const nowMs = Date.now();
const dueNextRunAtMs = nowMs - 1_000;
const staleExecutedNextRunAtMs = nowMs - 2_000;
await writeCronJobs(store.storePath, [
createIsolatedRegressionJob({
id: "manual-target",
name: "manual target",
scheduledAt: nowMs,
schedule: { kind: "at", at: new Date(nowMs + 3_600_000).toISOString() },
payload: { kind: "agentTurn", message: "manual target" },
state: { nextRunAtMs: nowMs + 3_600_000 },
}),
createIsolatedRegressionJob({
id: "unrelated-due",
name: "unrelated due",
scheduledAt: nowMs,
schedule: { kind: "cron", expr: "*/5 * * * *", tz: "UTC" },
payload: { kind: "agentTurn", message: "unrelated due" },
state: { nextRunAtMs: dueNextRunAtMs },
}),
createIsolatedRegressionJob({
id: "unrelated-stale-executed",
name: "unrelated stale executed",
scheduledAt: nowMs,
schedule: { kind: "cron", expr: "*/5 * * * *", tz: "UTC" },
payload: { kind: "agentTurn", message: "unrelated stale executed" },
state: {
nextRunAtMs: staleExecutedNextRunAtMs,
lastRunAtMs: staleExecutedNextRunAtMs + 1,
},
}),
]);
const state = createCronServiceState({
cronEnabled: false,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }),
});
const runResult = await run(state, "manual-target", "force");
expect(runResult).toEqual({ ok: true, ran: true });
const jobs = state.store?.jobs ?? [];
const unrelated = jobs.find((entry) => entry.id === "unrelated-due");
const staleExecuted = jobs.find((entry) => entry.id === "unrelated-stale-executed");
expect(unrelated?.state.nextRunAtMs).toBe(dueNextRunAtMs);
expect((staleExecuted?.state.nextRunAtMs ?? 0) > nowMs).toBe(true);
});
it("applies timeoutSeconds to manual cron.run isolated executions", async () => {
vi.useRealTimers();
const store = opsRegressionFixtures.makeStorePath();
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
const job = createIsolatedRegressionJob({
id: "manual-timeout",
name: "manual timeout",
scheduledAt,
schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt },
payload: { kind: "agentTurn", message: "work", timeoutSeconds: FAST_TIMEOUT_SECONDS },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [job]);
const abortAwareRunner = createAbortAwareIsolatedRunner();
const state = createCronServiceState({
cronEnabled: false,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: abortAwareRunner.runIsolatedAgentJob,
});
const result = await run(state, job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
const updated = state.store?.jobs.find((entry) => entry.id === job.id);
expect(updated?.state.lastStatus).toBe("error");
expect(updated?.state.lastError).toContain("timed out");
expect(updated?.state.runningAtMs).toBeUndefined();
});
it("#17554: run() clears stale runningAtMs and executes the job", async () => {
const store = opsRegressionFixtures.makeStorePath();
const now = Date.parse("2026-02-06T10:05:00.000Z");
const staleRunningAtMs = now - 2 * 60 * 60 * 1000 - 1;
await writeCronJobs(store.storePath, [
{
id: "stale-running",
name: "stale-running",
enabled: true,
createdAtMs: now - 3_600_000,
updatedAtMs: now - 3_600_000,
schedule: { kind: "at", at: new Date(now - 60_000).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "stale-running" },
state: {
runningAtMs: staleRunningAtMs,
lastRunAtMs: now - 3_600_000,
lastStatus: "ok",
nextRunAtMs: now - 60_000,
},
},
]);
const enqueueSystemEvent = vi.fn();
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent,
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }),
});
const result = await run(state, "stale-running", "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"stale-running",
expect.objectContaining({ agentId: undefined }),
);
});
it("queues manual cron.run requests behind the cron execution lane", async () => {
vi.useRealTimers();
clearCommandLane(CommandLane.Cron);
setCommandLaneConcurrency(CommandLane.Cron, 1);
const store = opsRegressionFixtures.makeStorePath();
const dueAt = Date.parse("2026-02-06T10:05:02.000Z");
const first = createDueIsolatedJob({ id: "queued-first", nowMs: dueAt, nextRunAtMs: dueAt });
const second = createDueIsolatedJob({
id: "queued-second",
nowMs: dueAt,
nextRunAtMs: dueAt,
});
await fs.writeFile(
store.storePath,
JSON.stringify({ version: 1, jobs: [first, second] }),
"utf-8",
);
let now = dueAt;
let activeRuns = 0;
let peakActiveRuns = 0;
const firstStarted = createDeferred<void>();
const firstRun = createDeferred<{ status: "ok"; summary: string }>();
const secondRun = createDeferred<{ status: "ok"; summary: string }>();
const secondStarted = createDeferred<void>();
const bothFinished = createDeferred<void>();
const runIsolatedAgentJob = vi.fn(async (params: { job: { id: string } }) => {
activeRuns += 1;
peakActiveRuns = Math.max(peakActiveRuns, activeRuns);
if (params.job.id === first.id) {
firstStarted.resolve();
}
if (params.job.id === second.id) {
secondStarted.resolve();
}
try {
const result =
params.job.id === first.id ? await firstRun.promise : await secondRun.promise;
now += 10;
return result;
} finally {
activeRuns -= 1;
}
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
cronConfig: { maxConcurrentRuns: 1 },
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
onEvent: (evt) => {
if (evt.action === "finished" && evt.jobId === second.id && evt.status === "ok") {
bothFinished.resolve();
}
},
});
const firstAck = await enqueueRun(state, first.id, "force");
const secondAck = await enqueueRun(state, second.id, "force");
expect(firstAck).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
expect(secondAck).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await firstStarted.promise;
expect(runIsolatedAgentJob.mock.calls[0]?.[0]).toMatchObject({ job: { id: first.id } });
expect(peakActiveRuns).toBe(1);
firstRun.resolve({ status: "ok", summary: "first queued run" });
await secondStarted.promise;
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2);
expect(runIsolatedAgentJob.mock.calls[1]?.[0]).toMatchObject({ job: { id: second.id } });
expect(peakActiveRuns).toBe(1);
secondRun.resolve({ status: "ok", summary: "second queued run" });
await bothFinished.promise;
const jobs = state.store?.jobs ?? [];
expect(jobs.find((job) => job.id === first.id)?.state.lastStatus).toBe("ok");
expect(jobs.find((job) => job.id === second.id)?.state.lastStatus).toBe("ok");
clearCommandLane(CommandLane.Cron);
});
it("logs unexpected queued manual run background failures once", async () => {
vi.useRealTimers();
clearCommandLane(CommandLane.Cron);
setCommandLaneConcurrency(CommandLane.Cron, 1);
const dueAt = Date.parse("2026-02-06T10:05:03.000Z");
const job = createDueIsolatedJob({ id: "queued-failure", nowMs: dueAt, nextRunAtMs: dueAt });
const errorLogged = createDeferred<void>();
const log = {
...noopLogger,
error: vi.fn(() => {
errorLogged.resolve();
}),
};
const badStore = `${opsRegressionFixtures.makeStorePath().storePath}.dir`;
await fs.mkdir(badStore, { recursive: true });
const state = createRunningCronServiceState({
storePath: badStore,
log,
nowMs: () => dueAt,
jobs: [job],
});
const result = await enqueueRun(state, job.id, "force");
expect(result).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
await errorLogged.promise;
expect(log.error).toHaveBeenCalledTimes(1);
expect(log.error.mock.calls[0]?.[1]).toBe(
"cron: queued manual run background execution failed",
);
clearCommandLane(CommandLane.Cron);
});
});

File diff suppressed because it is too large Load Diff