diff --git a/CHANGELOG.md b/CHANGELOG.md index e2db99bd56a..f2e05d2e044 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -200,6 +200,7 @@ Docs: https://docs.openclaw.ai - Discord/Application ID fallback: parse bot application IDs from token prefixes without numeric precision loss and use token fallback only on transport/timeout failures when probing `/oauth2/applications/@me`. Landed from contributor PR #29695 by @dhananjai1729. Thanks @dhananjai1729. - Discord/EventQueue timeout config: expose per-account `channels.discord.accounts..eventQueue.listenerTimeout` (and related queue options) so long-running handlers can avoid Carbon listener timeout drops. Landed from contributor PR #28945 by @Glucksberg. Thanks @Glucksberg. - CLI/Cron run exit code: return exit code `0` only when `cron run` reports `{ ok: true, ran: true }`, and `1` for non-run/error outcomes so scripting/debugging reflects actual execution status. Landed from contributor PR #31121 by @Sid-Qin. Thanks @Sid-Qin. +- Cron/Failure delivery routing: add `failureAlert.mode` (`announce|webhook`) and `failureAlert.accountId` support, plus `cron.failureDestination` and per-job `delivery.failureDestination` routing with duplicate-target suppression, best-effort skip behavior, and global+job merge semantics. Landed from contributor PR #31059 by @kesor. Thanks @kesor. - CLI/JSON preflight output: keep `--json` command stdout machine-readable by suppressing doctor preflight note output while still running legacy migration/config doctor flow. (#24368) Thanks @altaywtf. - Nodes/Screen recording guardrails: cap `nodes` tool `screen_record` `durationMs` to 5 minutes at both schema-validation and runtime invocation layers to prevent long-running blocking captures from unbounded durations. Landed from contributor PR #31106 by @BlueBirdBack. Thanks @BlueBirdBack. - Telegram/Empty final replies: skip outbound send for null/undefined final text payloads without media so Telegram typing indicators do not linger on `text must be non-empty` errors, with added regression coverage for undefined final payload dispatch. Landed from contributor PRs #30969 by @haosenwang1018 and #30746 by @rylena. Thanks @haosenwang1018 and @rylena. diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 4ebb6736106..8a9171042a2 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -694,4 +694,40 @@ describe("cron cli", () => { }); expect(runOpts.timeout).toBe("45000"); }); + + it("patches failure alert mode/accountId on cron edit", async () => { + callGatewayFromCli.mockClear(); + + const program = buildProgram(); + + await program.parseAsync( + [ + "cron", + "edit", + "job-1", + "--failure-alert-after", + "1", + "--failure-alert-mode", + "webhook", + "--failure-alert-account-id", + "bot-a", + ], + { from: "user" }, + ); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { + failureAlert?: { + after?: number; + mode?: "announce" | "webhook"; + accountId?: string; + }; + }; + }; + + expect(patch?.patch?.failureAlert?.after).toBe(1); + expect(patch?.patch?.failureAlert?.mode).toBe("webhook"); + expect(patch?.patch?.failureAlert?.accountId).toBe("bot-a"); + }); }); diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 7cc690f79cf..81ab672af57 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { resolveCronDeliveryPlan } from "./delivery.js"; +import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js"; import type { CronJob } from "./types.js"; function makeJob(overrides: Partial): CronJob { @@ -85,3 +85,96 @@ describe("resolveCronDeliveryPlan", () => { expect(plan.accountId).toBe("bot-a"); }); }); + +describe("resolveFailureDestination", () => { + it("merges global defaults with job-level overrides", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { channel: "signal", mode: "announce" }, + }, + }), + { + channel: "telegram", + to: "222", + mode: "announce", + accountId: "global-account", + }, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "signal", + to: "222", + accountId: "global-account", + }); + }); + + it("returns null for webhook mode without destination URL", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { mode: "webhook" }, + }, + }), + undefined, + ); + expect(plan).toBeNull(); + }); + + it("returns null when failure destination matches primary delivery target", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "telegram", + to: "111", + accountId: "bot-a", + }, + }, + }), + undefined, + ); + expect(plan).toBeNull(); + }); + + it("allows job-level failure destination fields to clear inherited global values", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { + mode: "announce", + channel: undefined as never, + to: undefined as never, + accountId: undefined as never, + }, + }, + }), + { + channel: "signal", + to: "group-abc", + accountId: "global-account", + mode: "announce", + }, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "last", + to: undefined, + accountId: undefined, + }); + }); +}); diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index a9ef3faa1b8..9d502a74fcb 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -1,7 +1,7 @@ import type { CliDeps } from "../cli/deps.js"; import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; -import { loadConfig } from "../config/config.js"; import type { CronFailureDestinationConfig } from "../config/types.cron.js"; +import type { OpenClawConfig } from "../config/types.js"; import { formatErrorMessage } from "../infra/errors.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js"; @@ -153,18 +153,21 @@ export function resolveFailureDestination( const jobTo = normalizeTo(jobFailureDest.to); const jobAccountId = normalizeAccountId(jobFailureDest.accountId); const jobMode = normalizeFailureMode(jobFailureDest.mode); + const hasJobChannelField = "channel" in jobFailureDest; + const hasJobToField = "to" in jobFailureDest; + const hasJobAccountIdField = "accountId" in jobFailureDest; // Track if 'to' was explicitly set at job level - const jobToExplicit = "to" in jobFailureDest && jobFailureDest.to !== undefined; + const jobToExplicitValue = hasJobToField && jobTo !== undefined; - // Only override if explicitly set (not undefined) - if (jobChannel !== undefined) { + // Respect explicit clears from partial patches. + if (hasJobChannelField) { channel = jobChannel; } - if (jobTo !== undefined) { + if (hasJobToField) { to = jobTo; } - if (jobAccountId !== undefined) { + if (hasJobAccountIdField) { accountId = jobAccountId; } if (jobMode !== undefined) { @@ -173,7 +176,7 @@ export function resolveFailureDestination( // But preserve explicit 'to' that was set at job level // Treat undefined global mode as "announce" for comparison const globalMode = globalConfig?.mode ?? "announce"; - if (!jobToExplicit && globalMode !== jobMode) { + if (!jobToExplicitValue && globalMode !== jobMode) { to = undefined; } mode = jobMode; @@ -237,12 +240,12 @@ const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" }); export async function sendFailureNotificationAnnounce( deps: CliDeps, + cfg: OpenClawConfig, agentId: string, jobId: string, target: { channel?: string; to?: string; accountId?: string }, message: string, ): Promise { - const cfg = loadConfig(); const resolvedTarget = await resolveDeliveryTarget(cfg, agentId, { channel: target.channel as CronMessageChannel | undefined, to: target.to, diff --git a/src/cron/service.failure-alert.test.ts b/src/cron/service.failure-alert.test.ts index 49ddac71409..b4b662183ab 100644 --- a/src/cron/service.failure-alert.test.ts +++ b/src/cron/service.failure-alert.test.ts @@ -195,4 +195,71 @@ describe("CronService failure alerts", () => { cron.stop(); await store.cleanup(); }); + + it("threads failure alert mode/accountId and skips best-effort jobs", async () => { + const store = await makeStorePath(); + const sendCronFailureAlert = vi.fn(async () => undefined); + const runIsolatedAgentJob = vi.fn(async () => ({ + status: "error" as const, + error: "temporary upstream error", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + cronConfig: { + failureAlert: { + enabled: true, + after: 1, + mode: "webhook", + accountId: "global-account", + }, + }, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + sendCronFailureAlert, + }); + + await cron.start(); + const normalJob = await cron.add({ + name: "normal alert job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + delivery: { mode: "announce", channel: "telegram", to: "19098680" }, + }); + const bestEffortJob = await cron.add({ + name: "best effort alert job", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "run report" }, + delivery: { + mode: "announce", + channel: "telegram", + to: "19098680", + bestEffort: true, + }, + }); + + await cron.run(normalJob.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + expect(sendCronFailureAlert).toHaveBeenCalledWith( + expect.objectContaining({ + mode: "webhook", + accountId: "global-account", + }), + ); + + await cron.run(bestEffortJob.id, "force"); + expect(sendCronFailureAlert).toHaveBeenCalledTimes(1); + + cron.stop(); + await store.cleanup(); + }); }); diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 7431e2e8467..cdd014bded9 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -73,6 +73,66 @@ function resolveCronWebhookTarget(params: { return null; } +function buildCronWebhookHeaders(webhookToken?: string): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + if (webhookToken) { + headers.Authorization = `Bearer ${webhookToken}`; + } + return headers; +} + +async function postCronWebhook(params: { + webhookUrl: string; + webhookToken?: string; + payload: unknown; + logContext: Record; + blockedLog: string; + failedLog: string; + logger: ReturnType; +}): Promise { + const abortController = new AbortController(); + const timeout = setTimeout(() => { + abortController.abort(); + }, CRON_WEBHOOK_TIMEOUT_MS); + + try { + const result = await fetchWithSsrFGuard({ + url: params.webhookUrl, + init: { + method: "POST", + headers: buildCronWebhookHeaders(params.webhookToken), + body: JSON.stringify(params.payload), + signal: abortController.signal, + }, + }); + await result.release(); + } catch (err) { + if (err instanceof SsrFBlockedError) { + params.logger.warn( + { + ...params.logContext, + reason: formatErrorMessage(err), + webhookUrl: redactWebhookUrl(params.webhookUrl), + }, + params.blockedLog, + ); + } else { + params.logger.warn( + { + ...params.logContext, + err: formatErrorMessage(err), + webhookUrl: redactWebhookUrl(params.webhookUrl), + }, + params.failedLog, + ); + } + } finally { + clearTimeout(timeout); + } +} + export function buildGatewayCronService(params: { cfg: ReturnType; deps: CliDeps; @@ -229,6 +289,7 @@ export function buildGatewayCronService(params: { }, sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); + const webhookToken = params.cfg.cron?.webhookToken?.trim(); // Webhook mode requires a URL - fail closed if missing if (mode === "webhook" && !to) { @@ -242,61 +303,24 @@ export function buildGatewayCronService(params: { if (mode === "webhook" && to) { const webhookUrl = normalizeHttpWebhookUrl(to); if (webhookUrl) { - const webhookToken = params.cfg.cron?.webhookToken?.trim(); - const headers: Record = { - "Content-Type": "application/json", - }; - if (webhookToken) { - headers.Authorization = `Bearer ${webhookToken}`; - } - const abortController = new AbortController(); - const timeout = setTimeout(() => { - abortController.abort(); - }, CRON_WEBHOOK_TIMEOUT_MS); - - try { - const result = await fetchWithSsrFGuard({ - url: webhookUrl, - init: { - method: "POST", - headers, - body: JSON.stringify({ - jobId: job.id, - jobName: job.name, - message: text, - }), - signal: abortController.signal, - }, - }); - await result.release(); - } catch (err) { - if (err instanceof SsrFBlockedError) { - cronLogger.warn( - { - reason: formatErrorMessage(err), - jobId: job.id, - webhookUrl: redactWebhookUrl(webhookUrl), - }, - "cron: failure alert webhook blocked by SSRF guard", - ); - } else { - cronLogger.warn( - { - err: formatErrorMessage(err), - jobId: job.id, - webhookUrl: redactWebhookUrl(webhookUrl), - }, - "cron: failure alert webhook failed", - ); - } - } finally { - clearTimeout(timeout); - } + await postCronWebhook({ + webhookUrl, + webhookToken, + payload: { + jobId: job.id, + jobName: job.name, + message: text, + }, + logContext: { jobId: job.id }, + blockedLog: "cron: failure alert webhook blocked by SSRF guard", + failedLog: "cron: failure alert webhook failed", + logger: cronLogger, + }); } else { cronLogger.warn( { jobId: job.id, - failureAlertTo: to, + webhookUrl: redactWebhookUrl(to), }, "cron: failure alert webhook URL is invalid, skipping", ); @@ -361,52 +385,16 @@ export function buildGatewayCronService(params: { } if (webhookTarget && evt.summary) { - const headers: Record = { - "Content-Type": "application/json", - }; - if (webhookToken) { - headers.Authorization = `Bearer ${webhookToken}`; - } - const abortController = new AbortController(); - const timeout = setTimeout(() => { - abortController.abort(); - }, CRON_WEBHOOK_TIMEOUT_MS); - void (async () => { - try { - const result = await fetchWithSsrFGuard({ - url: webhookTarget.url, - init: { - method: "POST", - headers, - body: JSON.stringify(evt), - signal: abortController.signal, - }, - }); - await result.release(); - } catch (err) { - if (err instanceof SsrFBlockedError) { - cronLogger.warn( - { - reason: formatErrorMessage(err), - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(webhookTarget.url), - }, - "cron: webhook delivery blocked by SSRF guard", - ); - } else { - cronLogger.warn( - { - err: formatErrorMessage(err), - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(webhookTarget.url), - }, - "cron: webhook delivery failed", - ); - } - } finally { - clearTimeout(timeout); - } + await postCronWebhook({ + webhookUrl: webhookTarget.url, + webhookToken, + payload: evt, + logContext: { jobId: evt.jobId }, + blockedLog: "cron: webhook delivery blocked by SSRF guard", + failedLog: "cron: webhook delivery failed", + logger: cronLogger, + }); })(); } @@ -432,66 +420,31 @@ export function buildGatewayCronService(params: { if (failureDest.mode === "webhook" && failureDest.to) { const webhookUrl = normalizeHttpWebhookUrl(failureDest.to); if (webhookUrl) { - const headers: Record = { - "Content-Type": "application/json", - }; - if (params.cfg.cron?.webhookToken) { - headers.Authorization = `Bearer ${params.cfg.cron.webhookToken.trim()}`; - } - const abortController = new AbortController(); - const timeout = setTimeout(() => { - abortController.abort(); - }, CRON_WEBHOOK_TIMEOUT_MS); - void (async () => { - try { - const result = await fetchWithSsrFGuard({ - url: webhookUrl, - init: { - method: "POST", - headers, - body: JSON.stringify(failurePayload), - signal: abortController.signal, - }, - }); - await result.release(); - } catch (err) { - if (err instanceof SsrFBlockedError) { - cronLogger.warn( - { - reason: formatErrorMessage(err), - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(webhookUrl), - }, - "cron: failure destination webhook blocked by SSRF guard", - ); - } else { - cronLogger.warn( - { - err: formatErrorMessage(err), - jobId: evt.jobId, - webhookUrl: redactWebhookUrl(webhookUrl), - }, - "cron: failure destination webhook failed", - ); - } - } finally { - clearTimeout(timeout); - } + await postCronWebhook({ + webhookUrl, + webhookToken, + payload: failurePayload, + logContext: { jobId: evt.jobId }, + blockedLog: "cron: failure destination webhook blocked by SSRF guard", + failedLog: "cron: failure destination webhook failed", + logger: cronLogger, + }); })(); } else { cronLogger.warn( { jobId: evt.jobId, - failureDestTo: failureDest.to, + webhookUrl: redactWebhookUrl(failureDest.to), }, "cron: failure destination webhook URL is invalid, skipping", ); } } else if (failureDest.mode === "announce") { - const { agentId } = resolveCronAgent(job.agentId); + const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); void sendFailureNotificationAnnounce( params.deps, + runtimeConfig, agentId, job.id, { diff --git a/ui/src/ui/controllers/cron.test.ts b/ui/src/ui/controllers/cron.test.ts index d3f6b715770..11a32981635 100644 --- a/ui/src/ui/controllers/cron.test.ts +++ b/ui/src/ui/controllers/cron.test.ts @@ -630,6 +630,52 @@ describe("cron controller", () => { cooldownMs: 120_000, channel: "telegram", to: "123456", + mode: "announce", + accountId: undefined, + }, + }, + }); + }); + + it("includes failure alert mode/accountId in cron.update patch", async () => { + const request = vi.fn(async (method: string, _payload?: unknown) => { + if (method === "cron.update") { + return { id: "job-alert-mode" }; + } + if (method === "cron.list") { + return { jobs: [{ id: "job-alert-mode" }] }; + } + if (method === "cron.status") { + return { enabled: true, jobs: 1, nextWakeAtMs: null }; + } + return {}; + }); + const state = createState({ + client: { request } as unknown as CronState["client"], + cronEditingJobId: "job-alert-mode", + cronForm: { + ...DEFAULT_CRON_FORM, + name: "alert mode job", + payloadKind: "agentTurn", + payloadText: "run it", + failureAlertMode: "custom", + failureAlertAfter: "1", + failureAlertDeliveryMode: "webhook", + failureAlertAccountId: "bot-a", + }, + }); + + await addCronJob(state); + + const updateCall = request.mock.calls.find(([method]) => method === "cron.update"); + expect(updateCall).toBeDefined(); + expect(updateCall?.[1]).toMatchObject({ + id: "job-alert-mode", + patch: { + failureAlert: { + after: 1, + mode: "webhook", + accountId: "bot-a", }, }, }); @@ -780,6 +826,8 @@ describe("cron controller", () => { expect(state.cronForm.failureAlertCooldownSeconds).toBe("30"); expect(state.cronForm.failureAlertChannel).toBe("telegram"); expect(state.cronForm.failureAlertTo).toBe("999"); + expect(state.cronForm.failureAlertDeliveryMode).toBe("announce"); + expect(state.cronForm.failureAlertAccountId).toBe(""); }); it("validates key cron form errors", () => { diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index 90125c2af18..e9d3379f418 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -491,7 +491,14 @@ export type CronDelivery = { to?: string; accountId?: string; bestEffort?: boolean; - failureDestination?: CronFailureAlert; + failureDestination?: CronFailureDestination; +}; + +export type CronFailureDestination = { + channel?: string; + to?: string; + mode?: "announce" | "webhook"; + accountId?: string; }; export type CronFailureAlert = {