diff --git a/src/cron/delivery-plan.ts b/src/cron/delivery-plan.ts new file mode 100644 index 00000000000..a04d4c64629 --- /dev/null +++ b/src/cron/delivery-plan.ts @@ -0,0 +1,233 @@ +import type { CronFailureDestinationConfig } from "../config/types.cron.js"; +import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; + +export type CronDeliveryPlan = { + mode: CronDeliveryMode; + channel?: CronMessageChannel; + to?: string; + threadId?: string | number; + /** Explicit channel account id from the delivery config, if set. */ + accountId?: string; + source: "delivery"; + requested: boolean; +}; + +function normalizeChannel(value: unknown): CronMessageChannel | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim().toLowerCase(); + if (!trimmed) { + return undefined; + } + return trimmed as CronMessageChannel; +} + +function normalizeTo(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + +function normalizeAccountId(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + +function normalizeThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + +export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { + const delivery = job.delivery; + const hasDelivery = delivery && typeof delivery === "object"; + const rawMode = hasDelivery ? (delivery as { mode?: unknown }).mode : undefined; + const normalizedMode = typeof rawMode === "string" ? rawMode.trim().toLowerCase() : rawMode; + const mode = + normalizedMode === "announce" + ? "announce" + : normalizedMode === "webhook" + ? "webhook" + : normalizedMode === "none" + ? "none" + : normalizedMode === "deliver" + ? "announce" + : undefined; + + const deliveryChannel = normalizeChannel( + (delivery as { channel?: unknown } | undefined)?.channel, + ); + const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to); + const deliveryThreadId = normalizeThreadId( + (delivery as { threadId?: unknown } | undefined)?.threadId, + ); + const channel = deliveryChannel ?? "last"; + const to = deliveryTo; + const deliveryAccountId = normalizeAccountId( + (delivery as { accountId?: unknown } | undefined)?.accountId, + ); + if (hasDelivery) { + const resolvedMode = mode ?? "announce"; + return { + mode: resolvedMode, + channel: resolvedMode === "announce" ? channel : undefined, + to, + threadId: resolvedMode === "announce" ? deliveryThreadId : undefined, + accountId: deliveryAccountId, + source: "delivery", + requested: resolvedMode === "announce", + }; + } + + const isIsolatedAgentTurn = + job.payload.kind === "agentTurn" && + (job.sessionTarget === "isolated" || + job.sessionTarget === "current" || + job.sessionTarget.startsWith("session:")); + const resolvedMode = isIsolatedAgentTurn ? "announce" : "none"; + + return { + mode: resolvedMode, + channel: resolvedMode === "announce" ? "last" : undefined, + to: undefined, + threadId: undefined, + source: "delivery", + requested: resolvedMode === "announce", + }; +} + +export type CronFailureDeliveryPlan = { + mode: "announce" | "webhook"; + channel?: CronMessageChannel; + to?: string; + accountId?: string; +}; + +export type CronFailureDestinationInput = { + channel?: CronMessageChannel; + to?: string; + accountId?: string; + mode?: "announce" | "webhook"; +}; + +function normalizeFailureMode(value: unknown): "announce" | "webhook" | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim().toLowerCase(); + if (trimmed === "announce" || trimmed === "webhook") { + return trimmed; + } + return undefined; +} + +export function resolveFailureDestination( + job: CronJob, + globalConfig?: CronFailureDestinationConfig, +): CronFailureDeliveryPlan | null { + const delivery = job.delivery; + const jobFailureDest = delivery?.failureDestination as CronFailureDestinationInput | undefined; + const hasJobFailureDest = jobFailureDest && typeof jobFailureDest === "object"; + + let channel: CronMessageChannel | undefined; + let to: string | undefined; + let accountId: string | undefined; + let mode: "announce" | "webhook" | undefined; + + if (globalConfig) { + channel = normalizeChannel(globalConfig.channel); + to = normalizeTo(globalConfig.to); + accountId = normalizeAccountId(globalConfig.accountId); + mode = normalizeFailureMode(globalConfig.mode); + } + + if (hasJobFailureDest) { + const jobChannel = normalizeChannel(jobFailureDest.channel); + const jobTo = normalizeTo(jobFailureDest.to); + const jobAccountId = normalizeAccountId(jobFailureDest.accountId); + const jobMode = normalizeFailureMode(jobFailureDest.mode); + const hasJobChannelField = "channel" in jobFailureDest; + const hasJobToField = "to" in jobFailureDest; + const hasJobAccountIdField = "accountId" in jobFailureDest; + + const jobToExplicitValue = hasJobToField && jobTo !== undefined; + + if (hasJobChannelField) { + channel = jobChannel; + } + if (hasJobToField) { + to = jobTo; + } + if (hasJobAccountIdField) { + accountId = jobAccountId; + } + if (jobMode !== undefined) { + const globalMode = globalConfig?.mode ?? "announce"; + if (!jobToExplicitValue && globalMode !== jobMode) { + to = undefined; + } + mode = jobMode; + } + } + + if (!channel && !to && !accountId && !mode) { + return null; + } + + const resolvedMode = mode ?? "announce"; + if (resolvedMode === "webhook" && !to) { + return null; + } + + const result: CronFailureDeliveryPlan = { + mode: resolvedMode, + channel: resolvedMode === "announce" ? (channel ?? "last") : undefined, + to, + accountId, + }; + + if (delivery && isSameDeliveryTarget(delivery, result)) { + return null; + } + + return result; +} + +function isSameDeliveryTarget( + delivery: CronDelivery, + failurePlan: CronFailureDeliveryPlan, +): boolean { + const primaryMode = delivery.mode ?? "announce"; + if (primaryMode === "none") { + return false; + } + + const primaryChannel = delivery.channel; + const primaryTo = delivery.to; + const primaryAccountId = delivery.accountId; + + if (failurePlan.mode === "webhook") { + return primaryMode === "webhook" && primaryTo === failurePlan.to; + } + + const primaryChannelNormalized = primaryChannel ?? "last"; + const failureChannelNormalized = failurePlan.channel ?? "last"; + + return ( + failureChannelNormalized === primaryChannelNormalized && + failurePlan.to === primaryTo && + failurePlan.accountId === primaryAccountId + ); +} diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 2a2e2aa29ab..f30ddb22d4a 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js"; +import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery-plan.js"; import type { CronJob } from "./types.js"; function makeJob(overrides: Partial): CronJob { diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index 2f2e29fedac..74cb24e6aec 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -1,256 +1,29 @@ import type { CliDeps } from "../cli/deps.js"; import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; -import type { CronFailureDestinationConfig } from "../config/types.cron.js"; import type { OpenClawConfig } from "../config/types.js"; import { formatErrorMessage } from "../infra/errors.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { getChildLogger } from "../logging.js"; +import { + resolveFailureDestination, + type CronFailureDeliveryPlan, + type CronFailureDestinationInput, + type CronDeliveryPlan, + resolveCronDeliveryPlan, +} from "./delivery-plan.js"; import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js"; -import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; +import type { CronMessageChannel } from "./types.js"; -export type CronDeliveryPlan = { - mode: CronDeliveryMode; - channel?: CronMessageChannel; - to?: string; - threadId?: string | number; - /** Explicit channel account id from the delivery config, if set. */ - accountId?: string; - source: "delivery"; - requested: boolean; +export { + resolveCronDeliveryPlan, + resolveFailureDestination, + type CronDeliveryPlan, + type CronFailureDeliveryPlan, + type CronFailureDestinationInput, }; -function normalizeChannel(value: unknown): CronMessageChannel | undefined { - if (typeof value !== "string") { - return undefined; - } - const trimmed = value.trim().toLowerCase(); - if (!trimmed) { - return undefined; - } - return trimmed as CronMessageChannel; -} - -function normalizeTo(value: unknown): string | undefined { - if (typeof value !== "string") { - return undefined; - } - const trimmed = value.trim(); - return trimmed ? trimmed : undefined; -} - -function normalizeAccountId(value: unknown): string | undefined { - if (typeof value !== "string") { - return undefined; - } - const trimmed = value.trim(); - return trimmed ? trimmed : undefined; -} - -function normalizeThreadId(value: unknown): string | number | undefined { - if (typeof value === "number" && Number.isFinite(value)) { - return value; - } - if (typeof value !== "string") { - return undefined; - } - const trimmed = value.trim(); - return trimmed ? trimmed : undefined; -} - -export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { - const delivery = job.delivery; - const hasDelivery = delivery && typeof delivery === "object"; - const rawMode = hasDelivery ? (delivery as { mode?: unknown }).mode : undefined; - const normalizedMode = typeof rawMode === "string" ? rawMode.trim().toLowerCase() : rawMode; - const mode = - normalizedMode === "announce" - ? "announce" - : normalizedMode === "webhook" - ? "webhook" - : normalizedMode === "none" - ? "none" - : normalizedMode === "deliver" - ? "announce" - : undefined; - - const deliveryChannel = normalizeChannel( - (delivery as { channel?: unknown } | undefined)?.channel, - ); - const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to); - const deliveryThreadId = normalizeThreadId( - (delivery as { threadId?: unknown } | undefined)?.threadId, - ); - const channel = deliveryChannel ?? "last"; - const to = deliveryTo; - const deliveryAccountId = normalizeAccountId( - (delivery as { accountId?: unknown } | undefined)?.accountId, - ); - if (hasDelivery) { - const resolvedMode = mode ?? "announce"; - return { - mode: resolvedMode, - channel: resolvedMode === "announce" ? channel : undefined, - to, - threadId: resolvedMode === "announce" ? deliveryThreadId : undefined, - accountId: deliveryAccountId, - source: "delivery", - requested: resolvedMode === "announce", - }; - } - - const isIsolatedAgentTurn = - job.payload.kind === "agentTurn" && - (job.sessionTarget === "isolated" || - job.sessionTarget === "current" || - job.sessionTarget.startsWith("session:")); - const resolvedMode = isIsolatedAgentTurn ? "announce" : "none"; - - return { - mode: resolvedMode, - channel: resolvedMode === "announce" ? "last" : undefined, - to: undefined, - threadId: undefined, - source: "delivery", - requested: resolvedMode === "announce", - }; -} - -export type CronFailureDeliveryPlan = { - mode: "announce" | "webhook"; - channel?: CronMessageChannel; - to?: string; - accountId?: string; -}; - -export type CronFailureDestinationInput = { - channel?: CronMessageChannel; - to?: string; - accountId?: string; - mode?: "announce" | "webhook"; -}; - -function normalizeFailureMode(value: unknown): "announce" | "webhook" | undefined { - if (typeof value !== "string") { - return undefined; - } - const trimmed = value.trim().toLowerCase(); - if (trimmed === "announce" || trimmed === "webhook") { - return trimmed; - } - return undefined; -} - -export function resolveFailureDestination( - job: CronJob, - globalConfig?: CronFailureDestinationConfig, -): CronFailureDeliveryPlan | null { - const delivery = job.delivery; - const jobFailureDest = delivery?.failureDestination as CronFailureDestinationInput | undefined; - const hasJobFailureDest = jobFailureDest && typeof jobFailureDest === "object"; - - let channel: CronMessageChannel | undefined; - let to: string | undefined; - let accountId: string | undefined; - let mode: "announce" | "webhook" | undefined; - - // Start with global config as base - if (globalConfig) { - channel = normalizeChannel(globalConfig.channel); - to = normalizeTo(globalConfig.to); - accountId = normalizeAccountId(globalConfig.accountId); - mode = normalizeFailureMode(globalConfig.mode); - } - - // Override with job-level values if present - if (hasJobFailureDest) { - const jobChannel = normalizeChannel(jobFailureDest.channel); - const jobTo = normalizeTo(jobFailureDest.to); - const jobAccountId = normalizeAccountId(jobFailureDest.accountId); - const jobMode = normalizeFailureMode(jobFailureDest.mode); - const hasJobChannelField = "channel" in jobFailureDest; - const hasJobToField = "to" in jobFailureDest; - const hasJobAccountIdField = "accountId" in jobFailureDest; - - // Track if 'to' was explicitly set at job level - const jobToExplicitValue = hasJobToField && jobTo !== undefined; - - // Respect explicit clears from partial patches. - if (hasJobChannelField) { - channel = jobChannel; - } - if (hasJobToField) { - to = jobTo; - } - if (hasJobAccountIdField) { - accountId = jobAccountId; - } - if (jobMode !== undefined) { - // Mode was explicitly overridden - clear inherited 'to' since URL semantics differ - // between announce (channel recipient) and webhook (HTTP endpoint) - // But preserve explicit 'to' that was set at job level - // Treat undefined global mode as "announce" for comparison - const globalMode = globalConfig?.mode ?? "announce"; - if (!jobToExplicitValue && globalMode !== jobMode) { - to = undefined; - } - mode = jobMode; - } - } - - if (!channel && !to && !accountId && !mode) { - return null; - } - - const resolvedMode = mode ?? "announce"; - - // Webhook mode requires a URL - if (resolvedMode === "webhook" && !to) { - return null; - } - - const result: CronFailureDeliveryPlan = { - mode: resolvedMode, - channel: resolvedMode === "announce" ? (channel ?? "last") : undefined, - to, - accountId, - }; - - if (delivery && isSameDeliveryTarget(delivery, result)) { - return null; - } - - return result; -} - -function isSameDeliveryTarget( - delivery: CronDelivery, - failurePlan: CronFailureDeliveryPlan, -): boolean { - const primaryMode = delivery.mode ?? "announce"; - if (primaryMode === "none") { - return false; - } - - const primaryChannel = delivery.channel; - const primaryTo = delivery.to; - const primaryAccountId = delivery.accountId; - - if (failurePlan.mode === "webhook") { - return primaryMode === "webhook" && primaryTo === failurePlan.to; - } - - const primaryChannelNormalized = primaryChannel ?? "last"; - const failureChannelNormalized = failurePlan.channel ?? "last"; - - return ( - failureChannelNormalized === primaryChannelNormalized && - failurePlan.to === primaryTo && - failurePlan.accountId === primaryAccountId - ); -} - const FAILURE_NOTIFICATION_TIMEOUT_MS = 30_000; const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" }); diff --git a/src/cron/isolated-agent/run.test-harness.ts b/src/cron/isolated-agent/run.test-harness.ts index df7543c8ded..4daa91fee7e 100644 --- a/src/cron/isolated-agent/run.test-harness.ts +++ b/src/cron/isolated-agent/run.test-harness.ts @@ -245,7 +245,7 @@ vi.mock("../../security/external-content.js", async (importOriginal) => { }; }); -vi.mock("../delivery.js", () => ({ +vi.mock("../delivery-plan.js", () => ({ resolveCronDeliveryPlan: resolveCronDeliveryPlanMock, })); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index b3eb1c2cfde..b3eb4b61965 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -43,7 +43,7 @@ import { isExternalHookSession, resolveHookExternalContentSource, } from "../../security/external-content.js"; -import { resolveCronDeliveryPlan } from "../delivery.js"; +import { resolveCronDeliveryPlan } from "../delivery-plan.js"; import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { dispatchCronDelivery, diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 36442c06b53..edd9f93c2b6 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -7,7 +7,7 @@ import { createRunningTaskRun, failTaskRunByRunId, } from "../../tasks/task-executor.js"; -import { resolveCronDeliveryPlan } from "../delivery.js"; +import { resolveCronDeliveryPlan } from "../delivery-plan.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus,