refactor(cron): normalize legacy delivery at ingress

This commit is contained in:
Peter Steinberger
2026-03-08 00:38:24 +00:00
parent 9b99787c31
commit e66c418c45
6 changed files with 133 additions and 123 deletions

View File

@@ -42,6 +42,102 @@ export function buildDeliveryFromLegacyPayload(
return next;
}
export function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unknown>) {
const deliver = payload.deliver;
const channelRaw =
typeof payload.channel === "string" && payload.channel.trim()
? payload.channel.trim().toLowerCase()
: typeof payload.provider === "string" && payload.provider.trim()
? payload.provider.trim().toLowerCase()
: "";
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
const next: Record<string, unknown> = {};
let hasPatch = false;
if (deliver === false) {
next.mode = "none";
hasPatch = true;
} else if (
deliver === true ||
channelRaw ||
toRaw ||
typeof payload.bestEffortDeliver === "boolean"
) {
next.mode = "announce";
hasPatch = true;
}
if (channelRaw) {
next.channel = channelRaw;
hasPatch = true;
}
if (toRaw) {
next.to = toRaw;
hasPatch = true;
}
if (typeof payload.bestEffortDeliver === "boolean") {
next.bestEffort = payload.bestEffortDeliver;
hasPatch = true;
}
return hasPatch ? next : null;
}
export function mergeLegacyDeliveryInto(
delivery: Record<string, unknown>,
payload: Record<string, unknown>,
) {
const patch = buildDeliveryPatchFromLegacyPayload(payload);
if (!patch) {
return { delivery, mutated: false };
}
const next = { ...delivery };
let mutated = false;
if ("mode" in patch && patch.mode !== next.mode) {
next.mode = patch.mode;
mutated = true;
}
if ("channel" in patch && patch.channel !== next.channel) {
next.channel = patch.channel;
mutated = true;
}
if ("to" in patch && patch.to !== next.to) {
next.to = patch.to;
mutated = true;
}
if ("bestEffort" in patch && patch.bestEffort !== next.bestEffort) {
next.bestEffort = patch.bestEffort;
mutated = true;
}
return { delivery: next, mutated };
}
export function normalizeLegacyDeliveryInput(params: {
delivery?: Record<string, unknown> | null;
payload?: Record<string, unknown> | null;
}) {
if (!params.payload || !hasLegacyDeliveryHints(params.payload)) {
return {
delivery: params.delivery ?? undefined,
mutated: false,
};
}
const nextDelivery = params.delivery
? mergeLegacyDeliveryInto(params.delivery, params.payload)
: {
delivery: buildDeliveryFromLegacyPayload(params.payload),
mutated: true,
};
stripLegacyDeliveryFields(params.payload);
return {
delivery: nextDelivery.delivery,
mutated: true,
};
}
export function stripLegacyDeliveryFields(payload: Record<string, unknown>) {
if ("deliver" in payload) {
delete payload.deliver;

View File

@@ -1,10 +1,6 @@
import { sanitizeAgentId } from "../routing/session-key.js";
import { isRecord } from "../utils.js";
import {
buildDeliveryFromLegacyPayload,
hasLegacyDeliveryHints,
stripLegacyDeliveryFields,
} from "./legacy-delivery.js";
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
import { parseAbsoluteTimeMs } from "./parse.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import { inferLegacyName } from "./service/normalize.js";
@@ -469,14 +465,20 @@ export function normalizeCronJobInput(
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = "delivery" in next && next.delivery !== undefined;
const hasLegacyDelivery = payload ? hasLegacyDeliveryHints(payload) : false;
if (!hasDelivery && isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (payload && hasLegacyDelivery) {
next.delivery = buildDeliveryFromLegacyPayload(payload);
stripLegacyDeliveryFields(payload);
} else {
next.delivery = { mode: "announce" };
}
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: isRecord(next.delivery) ? next.delivery : null,
payload,
});
if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
next.delivery = normalizedLegacy.delivery;
}
if (
!hasDelivery &&
!normalizedLegacy.delivery &&
isIsolatedAgentTurn &&
payloadKind === "agentTurn"
) {
next.delivery = { mode: "announce" };
}
}

View File

@@ -589,19 +589,6 @@ describe("createJob delivery defaults", () => {
expect(job.delivery).toEqual({ mode: "none" });
});
it("preserves legacy payload deliver=false when explicit delivery is omitted", () => {
const state = createMockState(now);
const job = createJob(state, {
name: "isolated-legacy-no-deliver",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "hello", deliver: false } as never,
});
expect(job.delivery).toEqual({ mode: "none" });
});
it("does not set delivery for main systemEvent jobs without explicit delivery", () => {
const state = createMockState(now, { defaultAgentId: "main" });
const job = createJob(state, {

View File

@@ -1,17 +1,9 @@
import { buildDeliveryFromLegacyPayload, hasLegacyDeliveryHints } from "../legacy-delivery.js";
import type { CronDelivery, CronJobCreate } from "../types.js";
export function resolveInitialCronDelivery(input: CronJobCreate): CronDelivery | undefined {
if (input.delivery) {
return input.delivery;
}
const payloadRecord =
input.payload && typeof input.payload === "object"
? (input.payload as Record<string, unknown>)
: undefined;
if (payloadRecord && hasLegacyDeliveryHints(payloadRecord)) {
return buildDeliveryFromLegacyPayload(payloadRecord) as CronDelivery;
}
if (input.sessionTarget === "isolated" && input.payload.kind === "agentTurn") {
return { mode: "announce" };
}

View File

@@ -1,3 +1,4 @@
import { normalizeCronJobCreate } from "../normalize.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
import {
applyJobPatch,
@@ -234,7 +235,11 @@ export async function add(state: CronServiceState, input: CronJobCreate) {
return await locked(state, async () => {
warnIfDisabled(state, "add");
await ensureLoaded(state);
const job = createJob(state, input);
const normalizedInput = normalizeCronJobCreate(input);
if (!normalizedInput) {
throw new Error("invalid cron job input");
}
const job = createJob(state, normalizedInput);
state.store?.jobs.push(job);
// Defensive: recompute all next-run times to ensure consistency

View File

@@ -1,9 +1,5 @@
import fs from "node:fs";
import {
buildDeliveryFromLegacyPayload,
hasLegacyDeliveryHints,
stripLegacyDeliveryFields,
} from "../legacy-delivery.js";
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import { migrateLegacyCronPayload } from "../payload-migration.js";
import { coerceFiniteScheduleNumber } from "../schedule.js";
@@ -14,69 +10,6 @@ import { recomputeNextRuns } from "./jobs.js";
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
import type { CronServiceState } from "./state.js";
function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unknown>) {
const deliver = payload.deliver;
const channelRaw =
typeof payload.channel === "string" ? payload.channel.trim().toLowerCase() : "";
const toRaw = typeof payload.to === "string" ? payload.to.trim() : "";
const next: Record<string, unknown> = {};
let hasPatch = false;
if (deliver === false) {
next.mode = "none";
hasPatch = true;
} else if (deliver === true || toRaw) {
next.mode = "announce";
hasPatch = true;
}
if (channelRaw) {
next.channel = channelRaw;
hasPatch = true;
}
if (toRaw) {
next.to = toRaw;
hasPatch = true;
}
if (typeof payload.bestEffortDeliver === "boolean") {
next.bestEffort = payload.bestEffortDeliver;
hasPatch = true;
}
return hasPatch ? next : null;
}
function mergeLegacyDeliveryInto(
delivery: Record<string, unknown>,
payload: Record<string, unknown>,
) {
const patch = buildDeliveryPatchFromLegacyPayload(payload);
if (!patch) {
return { delivery, mutated: false };
}
const next = { ...delivery };
let mutated = false;
if ("mode" in patch && patch.mode !== next.mode) {
next.mode = patch.mode;
mutated = true;
}
if ("channel" in patch && patch.channel !== next.channel) {
next.channel = patch.channel;
mutated = true;
}
if ("to" in patch && patch.to !== next.to) {
next.to = patch.to;
mutated = true;
}
if ("bestEffort" in patch && patch.bestEffort !== next.bestEffort) {
next.bestEffort = patch.bestEffort;
mutated = true;
}
return { delivery: next, mutated };
}
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
@@ -512,30 +445,25 @@ export async function ensureLoaded(
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const hasLegacyDelivery = payloadRecord ? hasLegacyDeliveryHints(payloadRecord) : false;
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery) {
raw.delivery =
payloadRecord && hasLegacyDelivery
? buildDeliveryFromLegacyPayload(payloadRecord)
: { mode: "announce" };
mutated = true;
}
if (payloadRecord && hasLegacyDelivery) {
if (hasDelivery) {
const merged = mergeLegacyDeliveryInto(
delivery as Record<string, unknown>,
payloadRecord,
);
if (merged.mutated) {
raw.delivery = merged.delivery;
mutated = true;
}
}
stripLegacyDeliveryFields(payloadRecord);
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };