Cron: harden failure destination routing and add regression coverage

This commit is contained in:
Tak Hoffman
2026-03-02 08:51:58 -06:00
parent bf86d07e70
commit f44c91e415
8 changed files with 361 additions and 153 deletions

View File

@@ -200,6 +200,7 @@ Docs: https://docs.openclaw.ai
- Discord/Application ID fallback: parse bot application IDs from token prefixes without numeric precision loss and use token fallback only on transport/timeout failures when probing `/oauth2/applications/@me`. Landed from contributor PR #29695 by @dhananjai1729. Thanks @dhananjai1729.
- Discord/EventQueue timeout config: expose per-account `channels.discord.accounts.<id>.eventQueue.listenerTimeout` (and related queue options) so long-running handlers can avoid Carbon listener timeout drops. Landed from contributor PR #28945 by @Glucksberg. Thanks @Glucksberg.
- CLI/Cron run exit code: return exit code `0` only when `cron run` reports `{ ok: true, ran: true }`, and `1` for non-run/error outcomes so scripting/debugging reflects actual execution status. Landed from contributor PR #31121 by @Sid-Qin. Thanks @Sid-Qin.
- Cron/Failure delivery routing: add `failureAlert.mode` (`announce|webhook`) and `failureAlert.accountId` support, plus `cron.failureDestination` and per-job `delivery.failureDestination` routing with duplicate-target suppression, best-effort skip behavior, and global+job merge semantics. Landed from contributor PR #31059 by @kesor. Thanks @kesor.
- CLI/JSON preflight output: keep `--json` command stdout machine-readable by suppressing doctor preflight note output while still running legacy migration/config doctor flow. (#24368) Thanks @altaywtf.
- Nodes/Screen recording guardrails: cap `nodes` tool `screen_record` `durationMs` to 5 minutes at both schema-validation and runtime invocation layers to prevent long-running blocking captures from unbounded durations. Landed from contributor PR #31106 by @BlueBirdBack. Thanks @BlueBirdBack.
- Telegram/Empty final replies: skip outbound send for null/undefined final text payloads without media so Telegram typing indicators do not linger on `text must be non-empty` errors, with added regression coverage for undefined final payload dispatch. Landed from contributor PRs #30969 by @haosenwang1018 and #30746 by @rylena. Thanks @haosenwang1018 and @rylena.

View File

@@ -694,4 +694,40 @@ describe("cron cli", () => {
});
expect(runOpts.timeout).toBe("45000");
});
it("patches failure alert mode/accountId on cron edit", async () => {
callGatewayFromCli.mockClear();
const program = buildProgram();
await program.parseAsync(
[
"cron",
"edit",
"job-1",
"--failure-alert-after",
"1",
"--failure-alert-mode",
"webhook",
"--failure-alert-account-id",
"bot-a",
],
{ from: "user" },
);
const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update");
const patch = updateCall?.[2] as {
patch?: {
failureAlert?: {
after?: number;
mode?: "announce" | "webhook";
accountId?: string;
};
};
};
expect(patch?.patch?.failureAlert?.after).toBe(1);
expect(patch?.patch?.failureAlert?.mode).toBe("webhook");
expect(patch?.patch?.failureAlert?.accountId).toBe("bot-a");
});
});

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { resolveCronDeliveryPlan } from "./delivery.js";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js";
import type { CronJob } from "./types.js";
function makeJob(overrides: Partial<CronJob>): CronJob {
@@ -85,3 +85,96 @@ describe("resolveCronDeliveryPlan", () => {
expect(plan.accountId).toBe("bot-a");
});
});
describe("resolveFailureDestination", () => {
it("merges global defaults with job-level overrides", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: { channel: "signal", mode: "announce" },
},
}),
{
channel: "telegram",
to: "222",
mode: "announce",
accountId: "global-account",
},
);
expect(plan).toEqual({
mode: "announce",
channel: "signal",
to: "222",
accountId: "global-account",
});
});
it("returns null for webhook mode without destination URL", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: { mode: "webhook" },
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("returns null when failure destination matches primary delivery target", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
accountId: "bot-a",
failureDestination: {
mode: "announce",
channel: "telegram",
to: "111",
accountId: "bot-a",
},
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("allows job-level failure destination fields to clear inherited global values", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: {
mode: "announce",
channel: undefined as never,
to: undefined as never,
accountId: undefined as never,
},
},
}),
{
channel: "signal",
to: "group-abc",
accountId: "global-account",
mode: "announce",
},
);
expect(plan).toEqual({
mode: "announce",
channel: "last",
to: undefined,
accountId: undefined,
});
});
});

View File

@@ -1,7 +1,7 @@
import type { CliDeps } from "../cli/deps.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import { loadConfig } from "../config/config.js";
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
import type { OpenClawConfig } from "../config/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js";
@@ -153,18 +153,21 @@ export function resolveFailureDestination(
const jobTo = normalizeTo(jobFailureDest.to);
const jobAccountId = normalizeAccountId(jobFailureDest.accountId);
const jobMode = normalizeFailureMode(jobFailureDest.mode);
const hasJobChannelField = "channel" in jobFailureDest;
const hasJobToField = "to" in jobFailureDest;
const hasJobAccountIdField = "accountId" in jobFailureDest;
// Track if 'to' was explicitly set at job level
const jobToExplicit = "to" in jobFailureDest && jobFailureDest.to !== undefined;
const jobToExplicitValue = hasJobToField && jobTo !== undefined;
// Only override if explicitly set (not undefined)
if (jobChannel !== undefined) {
// Respect explicit clears from partial patches.
if (hasJobChannelField) {
channel = jobChannel;
}
if (jobTo !== undefined) {
if (hasJobToField) {
to = jobTo;
}
if (jobAccountId !== undefined) {
if (hasJobAccountIdField) {
accountId = jobAccountId;
}
if (jobMode !== undefined) {
@@ -173,7 +176,7 @@ export function resolveFailureDestination(
// But preserve explicit 'to' that was set at job level
// Treat undefined global mode as "announce" for comparison
const globalMode = globalConfig?.mode ?? "announce";
if (!jobToExplicit && globalMode !== jobMode) {
if (!jobToExplicitValue && globalMode !== jobMode) {
to = undefined;
}
mode = jobMode;
@@ -237,12 +240,12 @@ const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" });
export async function sendFailureNotificationAnnounce(
deps: CliDeps,
cfg: OpenClawConfig,
agentId: string,
jobId: string,
target: { channel?: string; to?: string; accountId?: string },
message: string,
): Promise<void> {
const cfg = loadConfig();
const resolvedTarget = await resolveDeliveryTarget(cfg, agentId, {
channel: target.channel as CronMessageChannel | undefined,
to: target.to,

View File

@@ -195,4 +195,71 @@ describe("CronService failure alerts", () => {
cron.stop();
await store.cleanup();
});
it("threads failure alert mode/accountId and skips best-effort jobs", async () => {
const store = await makeStorePath();
const sendCronFailureAlert = vi.fn(async () => undefined);
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
error: "temporary upstream error",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
cronConfig: {
failureAlert: {
enabled: true,
after: 1,
mode: "webhook",
accountId: "global-account",
},
},
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
sendCronFailureAlert,
});
await cron.start();
const normalJob = await cron.add({
name: "normal alert job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
delivery: { mode: "announce", channel: "telegram", to: "19098680" },
});
const bestEffortJob = await cron.add({
name: "best effort alert job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
delivery: {
mode: "announce",
channel: "telegram",
to: "19098680",
bestEffort: true,
},
});
await cron.run(normalJob.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
expect(sendCronFailureAlert).toHaveBeenCalledWith(
expect.objectContaining({
mode: "webhook",
accountId: "global-account",
}),
);
await cron.run(bestEffortJob.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
cron.stop();
await store.cleanup();
});
});

View File

@@ -73,6 +73,66 @@ function resolveCronWebhookTarget(params: {
return null;
}
function buildCronWebhookHeaders(webhookToken?: string): Record<string, string> {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
return headers;
}
async function postCronWebhook(params: {
webhookUrl: string;
webhookToken?: string;
payload: unknown;
logContext: Record<string, unknown>;
blockedLog: string;
failedLog: string;
logger: ReturnType<typeof getChildLogger>;
}): Promise<void> {
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
try {
const result = await fetchWithSsrFGuard({
url: params.webhookUrl,
init: {
method: "POST",
headers: buildCronWebhookHeaders(params.webhookToken),
body: JSON.stringify(params.payload),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
params.logger.warn(
{
...params.logContext,
reason: formatErrorMessage(err),
webhookUrl: redactWebhookUrl(params.webhookUrl),
},
params.blockedLog,
);
} else {
params.logger.warn(
{
...params.logContext,
err: formatErrorMessage(err),
webhookUrl: redactWebhookUrl(params.webhookUrl),
},
params.failedLog,
);
}
} finally {
clearTimeout(timeout);
}
}
export function buildGatewayCronService(params: {
cfg: ReturnType<typeof loadConfig>;
deps: CliDeps;
@@ -229,6 +289,7 @@ export function buildGatewayCronService(params: {
},
sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
const webhookToken = params.cfg.cron?.webhookToken?.trim();
// Webhook mode requires a URL - fail closed if missing
if (mode === "webhook" && !to) {
@@ -242,61 +303,24 @@ export function buildGatewayCronService(params: {
if (mode === "webhook" && to) {
const webhookUrl = normalizeHttpWebhookUrl(to);
if (webhookUrl) {
const webhookToken = params.cfg.cron?.webhookToken?.trim();
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
try {
const result = await fetchWithSsrFGuard({
url: webhookUrl,
init: {
method: "POST",
headers,
body: JSON.stringify({
jobId: job.id,
jobName: job.name,
message: text,
}),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
cronLogger.warn(
{
reason: formatErrorMessage(err),
jobId: job.id,
webhookUrl: redactWebhookUrl(webhookUrl),
},
"cron: failure alert webhook blocked by SSRF guard",
);
} else {
cronLogger.warn(
{
err: formatErrorMessage(err),
jobId: job.id,
webhookUrl: redactWebhookUrl(webhookUrl),
},
"cron: failure alert webhook failed",
);
}
} finally {
clearTimeout(timeout);
}
await postCronWebhook({
webhookUrl,
webhookToken,
payload: {
jobId: job.id,
jobName: job.name,
message: text,
},
logContext: { jobId: job.id },
blockedLog: "cron: failure alert webhook blocked by SSRF guard",
failedLog: "cron: failure alert webhook failed",
logger: cronLogger,
});
} else {
cronLogger.warn(
{
jobId: job.id,
failureAlertTo: to,
webhookUrl: redactWebhookUrl(to),
},
"cron: failure alert webhook URL is invalid, skipping",
);
@@ -361,52 +385,16 @@ export function buildGatewayCronService(params: {
}
if (webhookTarget && evt.summary) {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
void (async () => {
try {
const result = await fetchWithSsrFGuard({
url: webhookTarget.url,
init: {
method: "POST",
headers,
body: JSON.stringify(evt),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
cronLogger.warn(
{
reason: formatErrorMessage(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookTarget.url),
},
"cron: webhook delivery blocked by SSRF guard",
);
} else {
cronLogger.warn(
{
err: formatErrorMessage(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookTarget.url),
},
"cron: webhook delivery failed",
);
}
} finally {
clearTimeout(timeout);
}
await postCronWebhook({
webhookUrl: webhookTarget.url,
webhookToken,
payload: evt,
logContext: { jobId: evt.jobId },
blockedLog: "cron: webhook delivery blocked by SSRF guard",
failedLog: "cron: webhook delivery failed",
logger: cronLogger,
});
})();
}
@@ -432,66 +420,31 @@ export function buildGatewayCronService(params: {
if (failureDest.mode === "webhook" && failureDest.to) {
const webhookUrl = normalizeHttpWebhookUrl(failureDest.to);
if (webhookUrl) {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (params.cfg.cron?.webhookToken) {
headers.Authorization = `Bearer ${params.cfg.cron.webhookToken.trim()}`;
}
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
void (async () => {
try {
const result = await fetchWithSsrFGuard({
url: webhookUrl,
init: {
method: "POST",
headers,
body: JSON.stringify(failurePayload),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
cronLogger.warn(
{
reason: formatErrorMessage(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookUrl),
},
"cron: failure destination webhook blocked by SSRF guard",
);
} else {
cronLogger.warn(
{
err: formatErrorMessage(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookUrl),
},
"cron: failure destination webhook failed",
);
}
} finally {
clearTimeout(timeout);
}
await postCronWebhook({
webhookUrl,
webhookToken,
payload: failurePayload,
logContext: { jobId: evt.jobId },
blockedLog: "cron: failure destination webhook blocked by SSRF guard",
failedLog: "cron: failure destination webhook failed",
logger: cronLogger,
});
})();
} else {
cronLogger.warn(
{
jobId: evt.jobId,
failureDestTo: failureDest.to,
webhookUrl: redactWebhookUrl(failureDest.to),
},
"cron: failure destination webhook URL is invalid, skipping",
);
}
} else if (failureDest.mode === "announce") {
const { agentId } = resolveCronAgent(job.agentId);
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
void sendFailureNotificationAnnounce(
params.deps,
runtimeConfig,
agentId,
job.id,
{

View File

@@ -630,6 +630,52 @@ describe("cron controller", () => {
cooldownMs: 120_000,
channel: "telegram",
to: "123456",
mode: "announce",
accountId: undefined,
},
},
});
});
it("includes failure alert mode/accountId in cron.update patch", async () => {
const request = vi.fn(async (method: string, _payload?: unknown) => {
if (method === "cron.update") {
return { id: "job-alert-mode" };
}
if (method === "cron.list") {
return { jobs: [{ id: "job-alert-mode" }] };
}
if (method === "cron.status") {
return { enabled: true, jobs: 1, nextWakeAtMs: null };
}
return {};
});
const state = createState({
client: { request } as unknown as CronState["client"],
cronEditingJobId: "job-alert-mode",
cronForm: {
...DEFAULT_CRON_FORM,
name: "alert mode job",
payloadKind: "agentTurn",
payloadText: "run it",
failureAlertMode: "custom",
failureAlertAfter: "1",
failureAlertDeliveryMode: "webhook",
failureAlertAccountId: "bot-a",
},
});
await addCronJob(state);
const updateCall = request.mock.calls.find(([method]) => method === "cron.update");
expect(updateCall).toBeDefined();
expect(updateCall?.[1]).toMatchObject({
id: "job-alert-mode",
patch: {
failureAlert: {
after: 1,
mode: "webhook",
accountId: "bot-a",
},
},
});
@@ -780,6 +826,8 @@ describe("cron controller", () => {
expect(state.cronForm.failureAlertCooldownSeconds).toBe("30");
expect(state.cronForm.failureAlertChannel).toBe("telegram");
expect(state.cronForm.failureAlertTo).toBe("999");
expect(state.cronForm.failureAlertDeliveryMode).toBe("announce");
expect(state.cronForm.failureAlertAccountId).toBe("");
});
it("validates key cron form errors", () => {

View File

@@ -491,7 +491,14 @@ export type CronDelivery = {
to?: string;
accountId?: string;
bestEffort?: boolean;
failureDestination?: CronFailureAlert;
failureDestination?: CronFailureDestination;
};
export type CronFailureDestination = {
channel?: string;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
};
export type CronFailureAlert = {