refactor: split cron delivery planning from sending

This commit is contained in:
Shakker
2026-04-01 16:51:50 +01:00
committed by Shakker
parent bd8d29c2b1
commit de952c036a
6 changed files with 251 additions and 245 deletions

233
src/cron/delivery-plan.ts Normal file
View File

@@ -0,0 +1,233 @@
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
export type CronDeliveryPlan = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
threadId?: string | number;
/** Explicit channel account id from the delivery config, if set. */
accountId?: string;
source: "delivery";
requested: boolean;
};
function normalizeChannel(value: unknown): CronMessageChannel | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim().toLowerCase();
if (!trimmed) {
return undefined;
}
return trimmed as CronMessageChannel;
}
function normalizeTo(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
function normalizeAccountId(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
function normalizeThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
const delivery = job.delivery;
const hasDelivery = delivery && typeof delivery === "object";
const rawMode = hasDelivery ? (delivery as { mode?: unknown }).mode : undefined;
const normalizedMode = typeof rawMode === "string" ? rawMode.trim().toLowerCase() : rawMode;
const mode =
normalizedMode === "announce"
? "announce"
: normalizedMode === "webhook"
? "webhook"
: normalizedMode === "none"
? "none"
: normalizedMode === "deliver"
? "announce"
: undefined;
const deliveryChannel = normalizeChannel(
(delivery as { channel?: unknown } | undefined)?.channel,
);
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
const deliveryThreadId = normalizeThreadId(
(delivery as { threadId?: unknown } | undefined)?.threadId,
);
const channel = deliveryChannel ?? "last";
const to = deliveryTo;
const deliveryAccountId = normalizeAccountId(
(delivery as { accountId?: unknown } | undefined)?.accountId,
);
if (hasDelivery) {
const resolvedMode = mode ?? "announce";
return {
mode: resolvedMode,
channel: resolvedMode === "announce" ? channel : undefined,
to,
threadId: resolvedMode === "announce" ? deliveryThreadId : undefined,
accountId: deliveryAccountId,
source: "delivery",
requested: resolvedMode === "announce",
};
}
const isIsolatedAgentTurn =
job.payload.kind === "agentTurn" &&
(job.sessionTarget === "isolated" ||
job.sessionTarget === "current" ||
job.sessionTarget.startsWith("session:"));
const resolvedMode = isIsolatedAgentTurn ? "announce" : "none";
return {
mode: resolvedMode,
channel: resolvedMode === "announce" ? "last" : undefined,
to: undefined,
threadId: undefined,
source: "delivery",
requested: resolvedMode === "announce",
};
}
export type CronFailureDeliveryPlan = {
mode: "announce" | "webhook";
channel?: CronMessageChannel;
to?: string;
accountId?: string;
};
export type CronFailureDestinationInput = {
channel?: CronMessageChannel;
to?: string;
accountId?: string;
mode?: "announce" | "webhook";
};
function normalizeFailureMode(value: unknown): "announce" | "webhook" | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim().toLowerCase();
if (trimmed === "announce" || trimmed === "webhook") {
return trimmed;
}
return undefined;
}
export function resolveFailureDestination(
job: CronJob,
globalConfig?: CronFailureDestinationConfig,
): CronFailureDeliveryPlan | null {
const delivery = job.delivery;
const jobFailureDest = delivery?.failureDestination as CronFailureDestinationInput | undefined;
const hasJobFailureDest = jobFailureDest && typeof jobFailureDest === "object";
let channel: CronMessageChannel | undefined;
let to: string | undefined;
let accountId: string | undefined;
let mode: "announce" | "webhook" | undefined;
if (globalConfig) {
channel = normalizeChannel(globalConfig.channel);
to = normalizeTo(globalConfig.to);
accountId = normalizeAccountId(globalConfig.accountId);
mode = normalizeFailureMode(globalConfig.mode);
}
if (hasJobFailureDest) {
const jobChannel = normalizeChannel(jobFailureDest.channel);
const jobTo = normalizeTo(jobFailureDest.to);
const jobAccountId = normalizeAccountId(jobFailureDest.accountId);
const jobMode = normalizeFailureMode(jobFailureDest.mode);
const hasJobChannelField = "channel" in jobFailureDest;
const hasJobToField = "to" in jobFailureDest;
const hasJobAccountIdField = "accountId" in jobFailureDest;
const jobToExplicitValue = hasJobToField && jobTo !== undefined;
if (hasJobChannelField) {
channel = jobChannel;
}
if (hasJobToField) {
to = jobTo;
}
if (hasJobAccountIdField) {
accountId = jobAccountId;
}
if (jobMode !== undefined) {
const globalMode = globalConfig?.mode ?? "announce";
if (!jobToExplicitValue && globalMode !== jobMode) {
to = undefined;
}
mode = jobMode;
}
}
if (!channel && !to && !accountId && !mode) {
return null;
}
const resolvedMode = mode ?? "announce";
if (resolvedMode === "webhook" && !to) {
return null;
}
const result: CronFailureDeliveryPlan = {
mode: resolvedMode,
channel: resolvedMode === "announce" ? (channel ?? "last") : undefined,
to,
accountId,
};
if (delivery && isSameDeliveryTarget(delivery, result)) {
return null;
}
return result;
}
function isSameDeliveryTarget(
delivery: CronDelivery,
failurePlan: CronFailureDeliveryPlan,
): boolean {
const primaryMode = delivery.mode ?? "announce";
if (primaryMode === "none") {
return false;
}
const primaryChannel = delivery.channel;
const primaryTo = delivery.to;
const primaryAccountId = delivery.accountId;
if (failurePlan.mode === "webhook") {
return primaryMode === "webhook" && primaryTo === failurePlan.to;
}
const primaryChannelNormalized = primaryChannel ?? "last";
const failureChannelNormalized = failurePlan.channel ?? "last";
return (
failureChannelNormalized === primaryChannelNormalized &&
failurePlan.to === primaryTo &&
failurePlan.accountId === primaryAccountId
);
}

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery-plan.js";
import type { CronJob } from "./types.js";
function makeJob(overrides: Partial<CronJob>): CronJob {

View File

@@ -1,256 +1,29 @@
import type { CliDeps } from "../cli/deps.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
import type { OpenClawConfig } from "../config/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { getChildLogger } from "../logging.js";
import {
resolveFailureDestination,
type CronFailureDeliveryPlan,
type CronFailureDestinationInput,
type CronDeliveryPlan,
resolveCronDeliveryPlan,
} from "./delivery-plan.js";
import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js";
import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
import type { CronMessageChannel } from "./types.js";
export type CronDeliveryPlan = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
threadId?: string | number;
/** Explicit channel account id from the delivery config, if set. */
accountId?: string;
source: "delivery";
requested: boolean;
export {
resolveCronDeliveryPlan,
resolveFailureDestination,
type CronDeliveryPlan,
type CronFailureDeliveryPlan,
type CronFailureDestinationInput,
};
function normalizeChannel(value: unknown): CronMessageChannel | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim().toLowerCase();
if (!trimmed) {
return undefined;
}
return trimmed as CronMessageChannel;
}
function normalizeTo(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
function normalizeAccountId(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
function normalizeThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
const delivery = job.delivery;
const hasDelivery = delivery && typeof delivery === "object";
const rawMode = hasDelivery ? (delivery as { mode?: unknown }).mode : undefined;
const normalizedMode = typeof rawMode === "string" ? rawMode.trim().toLowerCase() : rawMode;
const mode =
normalizedMode === "announce"
? "announce"
: normalizedMode === "webhook"
? "webhook"
: normalizedMode === "none"
? "none"
: normalizedMode === "deliver"
? "announce"
: undefined;
const deliveryChannel = normalizeChannel(
(delivery as { channel?: unknown } | undefined)?.channel,
);
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
const deliveryThreadId = normalizeThreadId(
(delivery as { threadId?: unknown } | undefined)?.threadId,
);
const channel = deliveryChannel ?? "last";
const to = deliveryTo;
const deliveryAccountId = normalizeAccountId(
(delivery as { accountId?: unknown } | undefined)?.accountId,
);
if (hasDelivery) {
const resolvedMode = mode ?? "announce";
return {
mode: resolvedMode,
channel: resolvedMode === "announce" ? channel : undefined,
to,
threadId: resolvedMode === "announce" ? deliveryThreadId : undefined,
accountId: deliveryAccountId,
source: "delivery",
requested: resolvedMode === "announce",
};
}
const isIsolatedAgentTurn =
job.payload.kind === "agentTurn" &&
(job.sessionTarget === "isolated" ||
job.sessionTarget === "current" ||
job.sessionTarget.startsWith("session:"));
const resolvedMode = isIsolatedAgentTurn ? "announce" : "none";
return {
mode: resolvedMode,
channel: resolvedMode === "announce" ? "last" : undefined,
to: undefined,
threadId: undefined,
source: "delivery",
requested: resolvedMode === "announce",
};
}
export type CronFailureDeliveryPlan = {
mode: "announce" | "webhook";
channel?: CronMessageChannel;
to?: string;
accountId?: string;
};
export type CronFailureDestinationInput = {
channel?: CronMessageChannel;
to?: string;
accountId?: string;
mode?: "announce" | "webhook";
};
function normalizeFailureMode(value: unknown): "announce" | "webhook" | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim().toLowerCase();
if (trimmed === "announce" || trimmed === "webhook") {
return trimmed;
}
return undefined;
}
export function resolveFailureDestination(
job: CronJob,
globalConfig?: CronFailureDestinationConfig,
): CronFailureDeliveryPlan | null {
const delivery = job.delivery;
const jobFailureDest = delivery?.failureDestination as CronFailureDestinationInput | undefined;
const hasJobFailureDest = jobFailureDest && typeof jobFailureDest === "object";
let channel: CronMessageChannel | undefined;
let to: string | undefined;
let accountId: string | undefined;
let mode: "announce" | "webhook" | undefined;
// Start with global config as base
if (globalConfig) {
channel = normalizeChannel(globalConfig.channel);
to = normalizeTo(globalConfig.to);
accountId = normalizeAccountId(globalConfig.accountId);
mode = normalizeFailureMode(globalConfig.mode);
}
// Override with job-level values if present
if (hasJobFailureDest) {
const jobChannel = normalizeChannel(jobFailureDest.channel);
const jobTo = normalizeTo(jobFailureDest.to);
const jobAccountId = normalizeAccountId(jobFailureDest.accountId);
const jobMode = normalizeFailureMode(jobFailureDest.mode);
const hasJobChannelField = "channel" in jobFailureDest;
const hasJobToField = "to" in jobFailureDest;
const hasJobAccountIdField = "accountId" in jobFailureDest;
// Track if 'to' was explicitly set at job level
const jobToExplicitValue = hasJobToField && jobTo !== undefined;
// Respect explicit clears from partial patches.
if (hasJobChannelField) {
channel = jobChannel;
}
if (hasJobToField) {
to = jobTo;
}
if (hasJobAccountIdField) {
accountId = jobAccountId;
}
if (jobMode !== undefined) {
// Mode was explicitly overridden - clear inherited 'to' since URL semantics differ
// between announce (channel recipient) and webhook (HTTP endpoint)
// But preserve explicit 'to' that was set at job level
// Treat undefined global mode as "announce" for comparison
const globalMode = globalConfig?.mode ?? "announce";
if (!jobToExplicitValue && globalMode !== jobMode) {
to = undefined;
}
mode = jobMode;
}
}
if (!channel && !to && !accountId && !mode) {
return null;
}
const resolvedMode = mode ?? "announce";
// Webhook mode requires a URL
if (resolvedMode === "webhook" && !to) {
return null;
}
const result: CronFailureDeliveryPlan = {
mode: resolvedMode,
channel: resolvedMode === "announce" ? (channel ?? "last") : undefined,
to,
accountId,
};
if (delivery && isSameDeliveryTarget(delivery, result)) {
return null;
}
return result;
}
function isSameDeliveryTarget(
delivery: CronDelivery,
failurePlan: CronFailureDeliveryPlan,
): boolean {
const primaryMode = delivery.mode ?? "announce";
if (primaryMode === "none") {
return false;
}
const primaryChannel = delivery.channel;
const primaryTo = delivery.to;
const primaryAccountId = delivery.accountId;
if (failurePlan.mode === "webhook") {
return primaryMode === "webhook" && primaryTo === failurePlan.to;
}
const primaryChannelNormalized = primaryChannel ?? "last";
const failureChannelNormalized = failurePlan.channel ?? "last";
return (
failureChannelNormalized === primaryChannelNormalized &&
failurePlan.to === primaryTo &&
failurePlan.accountId === primaryAccountId
);
}
const FAILURE_NOTIFICATION_TIMEOUT_MS = 30_000;
const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" });

View File

@@ -245,7 +245,7 @@ vi.mock("../../security/external-content.js", async (importOriginal) => {
};
});
vi.mock("../delivery.js", () => ({
vi.mock("../delivery-plan.js", () => ({
resolveCronDeliveryPlan: resolveCronDeliveryPlanMock,
}));

View File

@@ -43,7 +43,7 @@ import {
isExternalHookSession,
resolveHookExternalContentSource,
} from "../../security/external-content.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
import {
dispatchCronDelivery,

View File

@@ -7,7 +7,7 @@ import {
createRunningTaskRun,
failTaskRunByRunId,
} from "../../tasks/task-executor.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronDeliveryStatus,