mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:20:42 +00:00
refactor(cron): split notification routing
This commit is contained in:
@@ -13,7 +13,11 @@ import {
|
||||
type CronDeliveryPlan,
|
||||
resolveCronDeliveryPlan,
|
||||
} from "./delivery-plan.js";
|
||||
import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js";
|
||||
import {
|
||||
resolveDeliveryTarget,
|
||||
type DeliveryTargetResolution,
|
||||
} from "./isolated-agent/delivery-target.js";
|
||||
import { resolveCronNotificationSessionKey } from "./session-target.js";
|
||||
import type { CronMessageChannel } from "./types.js";
|
||||
|
||||
export {
|
||||
@@ -27,65 +31,143 @@ export {
|
||||
const FAILURE_NOTIFICATION_TIMEOUT_MS = 30_000;
|
||||
const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" });
|
||||
|
||||
export type CronAnnounceTarget = {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
sessionKey?: string;
|
||||
};
|
||||
|
||||
type SuccessfulDeliveryTarget = Extract<DeliveryTargetResolution, { ok: true }>;
|
||||
|
||||
async function resolveCronAnnounceDelivery(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentId: string;
|
||||
jobId: string;
|
||||
target: CronAnnounceTarget;
|
||||
}): Promise<
|
||||
| {
|
||||
ok: true;
|
||||
resolvedTarget: SuccessfulDeliveryTarget;
|
||||
session: ReturnType<typeof buildOutboundSessionContext>;
|
||||
identity: ReturnType<typeof resolveAgentOutboundIdentity>;
|
||||
}
|
||||
| { ok: false; error: Error }
|
||||
> {
|
||||
const resolvedTarget = await resolveDeliveryTarget(params.cfg, params.agentId, {
|
||||
channel: params.target.channel as CronMessageChannel | undefined,
|
||||
to: params.target.to,
|
||||
accountId: params.target.accountId,
|
||||
sessionKey: params.target.sessionKey,
|
||||
});
|
||||
|
||||
if (!resolvedTarget.ok) {
|
||||
return { ok: false, error: resolvedTarget.error };
|
||||
}
|
||||
|
||||
const identity = resolveAgentOutboundIdentity(params.cfg, params.agentId);
|
||||
const session = buildOutboundSessionContext({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
sessionKey: resolveCronNotificationSessionKey({
|
||||
jobId: params.jobId,
|
||||
sessionKey: params.target.sessionKey,
|
||||
}),
|
||||
});
|
||||
|
||||
return {
|
||||
ok: true,
|
||||
resolvedTarget,
|
||||
session,
|
||||
identity,
|
||||
};
|
||||
}
|
||||
|
||||
async function deliverCronAnnouncePayload(params: {
|
||||
deps: CliDeps;
|
||||
cfg: OpenClawConfig;
|
||||
delivery: {
|
||||
resolvedTarget: SuccessfulDeliveryTarget;
|
||||
session: ReturnType<typeof buildOutboundSessionContext>;
|
||||
identity: ReturnType<typeof resolveAgentOutboundIdentity>;
|
||||
};
|
||||
message: string;
|
||||
abortSignal: AbortSignal;
|
||||
}): Promise<void> {
|
||||
await deliverOutboundPayloads({
|
||||
cfg: params.cfg,
|
||||
channel: params.delivery.resolvedTarget.channel,
|
||||
to: params.delivery.resolvedTarget.to,
|
||||
accountId: params.delivery.resolvedTarget.accountId,
|
||||
threadId: params.delivery.resolvedTarget.threadId,
|
||||
payloads: [{ text: params.message }],
|
||||
session: params.delivery.session,
|
||||
identity: params.delivery.identity,
|
||||
bestEffort: false,
|
||||
deps: createOutboundSendDeps(params.deps),
|
||||
abortSignal: params.abortSignal,
|
||||
});
|
||||
}
|
||||
|
||||
export async function sendCronAnnouncePayloadStrict(params: {
|
||||
deps: CliDeps;
|
||||
cfg: OpenClawConfig;
|
||||
agentId: string;
|
||||
jobId: string;
|
||||
target: CronAnnounceTarget;
|
||||
message: string;
|
||||
abortSignal: AbortSignal;
|
||||
}): Promise<void> {
|
||||
const delivery = await resolveCronAnnounceDelivery(params);
|
||||
if (!delivery.ok) {
|
||||
throw delivery.error;
|
||||
}
|
||||
await deliverCronAnnouncePayload({
|
||||
deps: params.deps,
|
||||
cfg: params.cfg,
|
||||
delivery,
|
||||
message: params.message,
|
||||
abortSignal: params.abortSignal,
|
||||
});
|
||||
}
|
||||
|
||||
export async function sendFailureNotificationAnnounce(
|
||||
deps: CliDeps,
|
||||
cfg: OpenClawConfig,
|
||||
agentId: string,
|
||||
jobId: string,
|
||||
target: { channel?: string; to?: string; accountId?: string; sessionKey?: string },
|
||||
target: CronAnnounceTarget,
|
||||
message: string,
|
||||
): Promise<void> {
|
||||
const resolvedTarget = await resolveDeliveryTarget(cfg, agentId, {
|
||||
channel: target.channel as CronMessageChannel | undefined,
|
||||
to: target.to,
|
||||
accountId: target.accountId,
|
||||
sessionKey: target.sessionKey,
|
||||
});
|
||||
const delivery = await resolveCronAnnounceDelivery({ cfg, agentId, jobId, target });
|
||||
|
||||
if (!resolvedTarget.ok) {
|
||||
if (!delivery.ok) {
|
||||
cronDeliveryLogger.warn(
|
||||
{ error: resolvedTarget.error.message },
|
||||
{ error: delivery.error.message },
|
||||
"cron: failed to resolve failure destination target",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const identity = resolveAgentOutboundIdentity(cfg, agentId);
|
||||
const deliverySessionKey =
|
||||
typeof target.sessionKey === "string" && target.sessionKey.trim()
|
||||
? target.sessionKey.trim()
|
||||
: `cron:${jobId}:failure`;
|
||||
const session = buildOutboundSessionContext({
|
||||
cfg,
|
||||
agentId,
|
||||
sessionKey: deliverySessionKey,
|
||||
});
|
||||
|
||||
const abortController = new AbortController();
|
||||
const timeout = setTimeout(() => {
|
||||
abortController.abort();
|
||||
}, FAILURE_NOTIFICATION_TIMEOUT_MS);
|
||||
|
||||
try {
|
||||
await deliverOutboundPayloads({
|
||||
await deliverCronAnnouncePayload({
|
||||
deps,
|
||||
cfg,
|
||||
channel: resolvedTarget.channel,
|
||||
to: resolvedTarget.to,
|
||||
accountId: resolvedTarget.accountId,
|
||||
threadId: resolvedTarget.threadId,
|
||||
payloads: [{ text: message }],
|
||||
session,
|
||||
identity,
|
||||
bestEffort: false,
|
||||
deps: createOutboundSendDeps(deps),
|
||||
delivery,
|
||||
message,
|
||||
abortSignal: abortController.signal,
|
||||
});
|
||||
} catch (err) {
|
||||
cronDeliveryLogger.warn(
|
||||
{
|
||||
err: formatErrorMessage(err),
|
||||
channel: resolvedTarget.channel,
|
||||
to: resolvedTarget.to,
|
||||
channel: delivery.resolvedTarget.channel,
|
||||
to: delivery.resolvedTarget.to,
|
||||
},
|
||||
"cron: failure destination announce failed",
|
||||
);
|
||||
|
||||
48
src/cron/session-target.test.ts
Normal file
48
src/cron/session-target.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
resolveCronDeliverySessionKey,
|
||||
resolveCronFailureNotificationSessionKey,
|
||||
resolveCronNotificationSessionKey,
|
||||
resolveCronSessionTargetSessionKey,
|
||||
} from "./session-target.js";
|
||||
|
||||
describe("cron session target helpers", () => {
|
||||
it("extracts and trims persistent session targets", () => {
|
||||
expect(resolveCronSessionTargetSessionKey("session: agent:main:telegram:direct:123 ")).toBe(
|
||||
"agent:main:telegram:direct:123",
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects unsafe persistent session targets", () => {
|
||||
expect(() => resolveCronSessionTargetSessionKey("session:../../outside")).toThrow(
|
||||
"invalid cron sessionTarget session id",
|
||||
);
|
||||
});
|
||||
|
||||
it("prefers sessionTarget over creator sessionKey for delivery", () => {
|
||||
expect(
|
||||
resolveCronDeliverySessionKey({
|
||||
sessionTarget: "session:agent:main:telegram:direct:123",
|
||||
sessionKey: "agent:main:telegram:group:ops:sender:123",
|
||||
}),
|
||||
).toBe("agent:main:telegram:direct:123");
|
||||
});
|
||||
|
||||
it("falls back to trimmed creator sessionKey for delivery", () => {
|
||||
expect(
|
||||
resolveCronDeliverySessionKey({
|
||||
sessionTarget: "isolated",
|
||||
sessionKey: " agent:main:telegram:group:ops:sender:123 ",
|
||||
}),
|
||||
).toBe("agent:main:telegram:group:ops:sender:123");
|
||||
});
|
||||
|
||||
it("uses cron failure session fallback when no delivery session exists", () => {
|
||||
expect(resolveCronNotificationSessionKey({ jobId: "job-1", sessionKey: " " })).toBe(
|
||||
"cron:job-1:failure",
|
||||
);
|
||||
expect(
|
||||
resolveCronFailureNotificationSessionKey({ id: "job-2", sessionTarget: "isolated" }),
|
||||
).toBe("cron:job-2:failure");
|
||||
});
|
||||
});
|
||||
@@ -36,3 +36,23 @@ export function resolveCronDeliverySessionKey(job: {
|
||||
? job.sessionKey.trim()
|
||||
: undefined;
|
||||
}
|
||||
|
||||
export function resolveCronNotificationSessionKey(params: {
|
||||
jobId: string;
|
||||
sessionKey?: string | null;
|
||||
}): string {
|
||||
return typeof params.sessionKey === "string" && params.sessionKey.trim()
|
||||
? params.sessionKey.trim()
|
||||
: `cron:${params.jobId}:failure`;
|
||||
}
|
||||
|
||||
export function resolveCronFailureNotificationSessionKey(job: {
|
||||
id: string;
|
||||
sessionTarget?: string | null;
|
||||
sessionKey?: string | null;
|
||||
}): string {
|
||||
return resolveCronNotificationSessionKey({
|
||||
jobId: job.id,
|
||||
sessionKey: resolveCronDeliverySessionKey(job),
|
||||
});
|
||||
}
|
||||
|
||||
358
src/gateway/server-cron-notifications.ts
Normal file
358
src/gateway/server-cron-notifications.ts
Normal file
@@ -0,0 +1,358 @@
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
resolveCronDeliveryPlan,
|
||||
resolveFailureDestination,
|
||||
sendCronAnnouncePayloadStrict,
|
||||
sendFailureNotificationAnnounce,
|
||||
} from "../cron/delivery.js";
|
||||
import type { CronEvent } from "../cron/service.js";
|
||||
import { resolveCronDeliverySessionKey } from "../cron/session-target.js";
|
||||
import type { CronJob, CronMessageChannel } from "../cron/types.js";
|
||||
import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js";
|
||||
import { SsrFBlockedError } from "../infra/net/ssrf.js";
|
||||
import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
} from "../shared/string-coerce.js";
|
||||
|
||||
const CRON_WEBHOOK_TIMEOUT_MS = 10_000;
|
||||
|
||||
type CronLogger = {
|
||||
warn: (obj: unknown, msg?: string) => void;
|
||||
};
|
||||
|
||||
type CronAgentResolver = (requested?: string | null) => {
|
||||
agentId: string;
|
||||
cfg: OpenClawConfig;
|
||||
};
|
||||
|
||||
type CronWebhookTarget = {
|
||||
url: string;
|
||||
source: "delivery" | "legacy";
|
||||
};
|
||||
|
||||
function redactWebhookUrl(url: string): string {
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
return `${parsed.origin}${parsed.pathname}`;
|
||||
} catch {
|
||||
return "<invalid-webhook-url>";
|
||||
}
|
||||
}
|
||||
|
||||
function resolveCronWebhookTarget(params: {
|
||||
delivery?: { mode?: string; to?: string };
|
||||
legacyNotify?: boolean;
|
||||
legacyWebhook?: string;
|
||||
}): CronWebhookTarget | null {
|
||||
const mode = normalizeOptionalLowercaseString(params.delivery?.mode);
|
||||
if (mode === "webhook") {
|
||||
const url = normalizeHttpWebhookUrl(params.delivery?.to);
|
||||
return url ? { url, source: "delivery" } : null;
|
||||
}
|
||||
|
||||
if (params.legacyNotify) {
|
||||
const legacyUrl = normalizeHttpWebhookUrl(params.legacyWebhook);
|
||||
if (legacyUrl) {
|
||||
return { url: legacyUrl, source: "legacy" };
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function buildCronWebhookHeaders(webhookToken?: string): Record<string, string> {
|
||||
const headers: Record<string, string> = {
|
||||
"Content-Type": "application/json",
|
||||
};
|
||||
if (webhookToken) {
|
||||
headers.Authorization = `Bearer ${webhookToken}`;
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
async function postCronWebhook(params: {
|
||||
webhookUrl: string;
|
||||
webhookToken?: string;
|
||||
payload: unknown;
|
||||
logContext: Record<string, unknown>;
|
||||
blockedLog: string;
|
||||
failedLog: string;
|
||||
logger: CronLogger;
|
||||
}): Promise<void> {
|
||||
const abortController = new AbortController();
|
||||
const timeout = setTimeout(() => {
|
||||
abortController.abort();
|
||||
}, CRON_WEBHOOK_TIMEOUT_MS);
|
||||
|
||||
try {
|
||||
const result = await fetchWithSsrFGuard({
|
||||
url: params.webhookUrl,
|
||||
init: {
|
||||
method: "POST",
|
||||
headers: buildCronWebhookHeaders(params.webhookToken),
|
||||
body: JSON.stringify(params.payload),
|
||||
signal: abortController.signal,
|
||||
},
|
||||
});
|
||||
await result.release();
|
||||
} catch (err) {
|
||||
if (err instanceof SsrFBlockedError) {
|
||||
params.logger.warn(
|
||||
{
|
||||
...params.logContext,
|
||||
reason: formatErrorMessage(err),
|
||||
webhookUrl: redactWebhookUrl(params.webhookUrl),
|
||||
},
|
||||
params.blockedLog,
|
||||
);
|
||||
} else {
|
||||
params.logger.warn(
|
||||
{
|
||||
...params.logContext,
|
||||
err: formatErrorMessage(err),
|
||||
webhookUrl: redactWebhookUrl(params.webhookUrl),
|
||||
},
|
||||
params.failedLog,
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
export async function sendGatewayCronFailureAlert(params: {
|
||||
deps: CliDeps;
|
||||
logger: CronLogger;
|
||||
resolveCronAgent: CronAgentResolver;
|
||||
webhookToken?: unknown;
|
||||
job: CronJob;
|
||||
text: string;
|
||||
channel: CronMessageChannel;
|
||||
to?: string;
|
||||
mode?: "announce" | "webhook";
|
||||
accountId?: string;
|
||||
}): Promise<void> {
|
||||
const { agentId, cfg: runtimeConfig } = params.resolveCronAgent(params.job.agentId);
|
||||
const webhookToken = normalizeOptionalString(params.webhookToken);
|
||||
|
||||
if (params.mode === "webhook" && !params.to) {
|
||||
params.logger.warn(
|
||||
{ jobId: params.job.id },
|
||||
"cron: failure alert webhook mode requires URL, skipping",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (params.mode === "webhook" && params.to) {
|
||||
const webhookUrl = normalizeHttpWebhookUrl(params.to);
|
||||
if (webhookUrl) {
|
||||
await postCronWebhook({
|
||||
webhookUrl,
|
||||
webhookToken,
|
||||
payload: {
|
||||
jobId: params.job.id,
|
||||
jobName: params.job.name,
|
||||
message: params.text,
|
||||
},
|
||||
logContext: { jobId: params.job.id },
|
||||
blockedLog: "cron: failure alert webhook blocked by SSRF guard",
|
||||
failedLog: "cron: failure alert webhook failed",
|
||||
logger: params.logger,
|
||||
});
|
||||
} else {
|
||||
params.logger.warn(
|
||||
{
|
||||
jobId: params.job.id,
|
||||
webhookUrl: redactWebhookUrl(params.to),
|
||||
},
|
||||
"cron: failure alert webhook URL is invalid, skipping",
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const abortController = new AbortController();
|
||||
await sendCronAnnouncePayloadStrict({
|
||||
deps: params.deps,
|
||||
cfg: runtimeConfig,
|
||||
agentId,
|
||||
jobId: params.job.id,
|
||||
target: {
|
||||
channel: params.channel,
|
||||
to: params.to,
|
||||
accountId: params.accountId,
|
||||
sessionKey: resolveCronDeliverySessionKey(params.job),
|
||||
},
|
||||
message: params.text,
|
||||
abortSignal: abortController.signal,
|
||||
});
|
||||
}
|
||||
|
||||
export function dispatchGatewayCronFinishedNotifications(params: {
|
||||
evt: CronEvent;
|
||||
job?: CronJob;
|
||||
deps: CliDeps;
|
||||
logger: CronLogger;
|
||||
resolveCronAgent: CronAgentResolver;
|
||||
webhookToken?: unknown;
|
||||
legacyWebhook?: unknown;
|
||||
globalFailureDestination?: CronFailureDestinationConfig;
|
||||
warnedLegacyWebhookJobs: Set<string>;
|
||||
}): void {
|
||||
const webhookToken = normalizeOptionalString(params.webhookToken);
|
||||
const legacyWebhook = normalizeOptionalString(params.legacyWebhook);
|
||||
const legacyNotify = (params.job as { notify?: unknown } | undefined)?.notify === true;
|
||||
const webhookTarget = resolveCronWebhookTarget({
|
||||
delivery:
|
||||
params.job?.delivery && typeof params.job.delivery.mode === "string"
|
||||
? { mode: params.job.delivery.mode, to: params.job.delivery.to }
|
||||
: undefined,
|
||||
legacyNotify,
|
||||
legacyWebhook,
|
||||
});
|
||||
|
||||
if (!webhookTarget && params.job?.delivery?.mode === "webhook") {
|
||||
params.logger.warn(
|
||||
{
|
||||
jobId: params.evt.jobId,
|
||||
deliveryTo: params.job.delivery.to,
|
||||
},
|
||||
"cron: skipped webhook delivery, delivery.to must be a valid http(s) URL",
|
||||
);
|
||||
}
|
||||
|
||||
if (webhookTarget?.source === "legacy" && !params.warnedLegacyWebhookJobs.has(params.evt.jobId)) {
|
||||
params.warnedLegacyWebhookJobs.add(params.evt.jobId);
|
||||
params.logger.warn(
|
||||
{
|
||||
jobId: params.evt.jobId,
|
||||
legacyWebhook: redactWebhookUrl(webhookTarget.url),
|
||||
},
|
||||
"cron: deprecated notify+cron.webhook fallback in use, migrate to delivery.mode=webhook with delivery.to",
|
||||
);
|
||||
}
|
||||
|
||||
if (webhookTarget && params.evt.summary) {
|
||||
void (async () => {
|
||||
await postCronWebhook({
|
||||
webhookUrl: webhookTarget.url,
|
||||
webhookToken,
|
||||
payload: params.evt,
|
||||
logContext: { jobId: params.evt.jobId },
|
||||
blockedLog: "cron: webhook delivery blocked by SSRF guard",
|
||||
failedLog: "cron: webhook delivery failed",
|
||||
logger: params.logger,
|
||||
});
|
||||
})();
|
||||
}
|
||||
|
||||
dispatchCronFailureDestinationNotifications({
|
||||
evt: params.evt,
|
||||
job: params.job,
|
||||
deps: params.deps,
|
||||
logger: params.logger,
|
||||
resolveCronAgent: params.resolveCronAgent,
|
||||
webhookToken,
|
||||
globalFailureDestination: params.globalFailureDestination,
|
||||
});
|
||||
}
|
||||
|
||||
function dispatchCronFailureDestinationNotifications(params: {
|
||||
evt: CronEvent;
|
||||
job?: CronJob;
|
||||
deps: CliDeps;
|
||||
logger: CronLogger;
|
||||
resolveCronAgent: CronAgentResolver;
|
||||
webhookToken?: string;
|
||||
globalFailureDestination?: CronFailureDestinationConfig;
|
||||
}): void {
|
||||
if (params.evt.status !== "error" || !params.job || params.job.delivery?.bestEffort === true) {
|
||||
return;
|
||||
}
|
||||
|
||||
const failureMessage = `Cron job "${params.job.name}" failed: ${params.evt.error ?? "unknown error"}`;
|
||||
const failureDest = resolveFailureDestination(params.job, params.globalFailureDestination);
|
||||
const deliverySessionKey = resolveCronDeliverySessionKey(params.job);
|
||||
|
||||
if (failureDest) {
|
||||
const failurePayload = {
|
||||
jobId: params.job.id,
|
||||
jobName: params.job.name,
|
||||
message: failureMessage,
|
||||
status: params.evt.status,
|
||||
error: params.evt.error,
|
||||
runAtMs: params.evt.runAtMs,
|
||||
durationMs: params.evt.durationMs,
|
||||
nextRunAtMs: params.evt.nextRunAtMs,
|
||||
};
|
||||
|
||||
if (failureDest.mode === "webhook" && failureDest.to) {
|
||||
const webhookUrl = normalizeHttpWebhookUrl(failureDest.to);
|
||||
if (webhookUrl) {
|
||||
void (async () => {
|
||||
await postCronWebhook({
|
||||
webhookUrl,
|
||||
webhookToken: params.webhookToken,
|
||||
payload: failurePayload,
|
||||
logContext: { jobId: params.evt.jobId },
|
||||
blockedLog: "cron: failure destination webhook blocked by SSRF guard",
|
||||
failedLog: "cron: failure destination webhook failed",
|
||||
logger: params.logger,
|
||||
});
|
||||
})();
|
||||
} else {
|
||||
params.logger.warn(
|
||||
{
|
||||
jobId: params.evt.jobId,
|
||||
webhookUrl: redactWebhookUrl(failureDest.to),
|
||||
},
|
||||
"cron: failure destination webhook URL is invalid, skipping",
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (failureDest.mode === "announce") {
|
||||
const { agentId, cfg: runtimeConfig } = params.resolveCronAgent(params.job.agentId);
|
||||
void sendFailureNotificationAnnounce(
|
||||
params.deps,
|
||||
runtimeConfig,
|
||||
agentId,
|
||||
params.job.id,
|
||||
{
|
||||
channel: failureDest.channel,
|
||||
to: failureDest.to,
|
||||
accountId: failureDest.accountId,
|
||||
sessionKey: deliverySessionKey,
|
||||
},
|
||||
`⚠️ ${failureMessage}`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const primaryPlan = resolveCronDeliveryPlan(params.job);
|
||||
if (primaryPlan.mode !== "announce" || !primaryPlan.requested) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { agentId, cfg: runtimeConfig } = params.resolveCronAgent(params.job.agentId);
|
||||
void sendFailureNotificationAnnounce(
|
||||
params.deps,
|
||||
runtimeConfig,
|
||||
agentId,
|
||||
params.job.id,
|
||||
{
|
||||
channel: primaryPlan.channel,
|
||||
to: primaryPlan.to,
|
||||
accountId: primaryPlan.accountId,
|
||||
sessionKey: deliverySessionKey,
|
||||
},
|
||||
`⚠️ ${failureMessage}`,
|
||||
);
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { cleanupBrowserSessionsForLifecycleEnd } from "../browser-lifecycle-cleanup.js";
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
canonicalizeMainSessionAlias,
|
||||
@@ -10,40 +9,25 @@ import {
|
||||
} from "../config/sessions.js";
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
resolveCronDeliveryPlan,
|
||||
resolveFailureDestination,
|
||||
sendFailureNotificationAnnounce,
|
||||
} from "../cron/delivery.js";
|
||||
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
|
||||
import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js";
|
||||
import {
|
||||
appendCronRunLog,
|
||||
resolveCronRunLogPath,
|
||||
resolveCronRunLogPruneOptions,
|
||||
} from "../cron/run-log.js";
|
||||
import { CronService } from "../cron/service.js";
|
||||
import {
|
||||
resolveCronDeliverySessionKey,
|
||||
resolveCronSessionTargetSessionKey,
|
||||
} from "../cron/session-target.js";
|
||||
import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { fetchWithSsrFGuard } from "../infra/net/fetch-guard.js";
|
||||
import { SsrFBlockedError } from "../infra/net/ssrf.js";
|
||||
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
|
||||
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import {
|
||||
normalizeOptionalLowercaseString,
|
||||
normalizeOptionalString,
|
||||
} from "../shared/string-coerce.js";
|
||||
dispatchGatewayCronFinishedNotifications,
|
||||
sendGatewayCronFailureAlert,
|
||||
} from "./server-cron-notifications.js";
|
||||
|
||||
export type GatewayCronState = {
|
||||
cron: CronService;
|
||||
@@ -51,103 +35,6 @@ export type GatewayCronState = {
|
||||
cronEnabled: boolean;
|
||||
};
|
||||
|
||||
const CRON_WEBHOOK_TIMEOUT_MS = 10_000;
|
||||
|
||||
function redactWebhookUrl(url: string): string {
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
return `${parsed.origin}${parsed.pathname}`;
|
||||
} catch {
|
||||
return "<invalid-webhook-url>";
|
||||
}
|
||||
}
|
||||
|
||||
type CronWebhookTarget = {
|
||||
url: string;
|
||||
source: "delivery" | "legacy";
|
||||
};
|
||||
|
||||
function resolveCronWebhookTarget(params: {
|
||||
delivery?: { mode?: string; to?: string };
|
||||
legacyNotify?: boolean;
|
||||
legacyWebhook?: string;
|
||||
}): CronWebhookTarget | null {
|
||||
const mode = normalizeOptionalLowercaseString(params.delivery?.mode);
|
||||
if (mode === "webhook") {
|
||||
const url = normalizeHttpWebhookUrl(params.delivery?.to);
|
||||
return url ? { url, source: "delivery" } : null;
|
||||
}
|
||||
|
||||
if (params.legacyNotify) {
|
||||
const legacyUrl = normalizeHttpWebhookUrl(params.legacyWebhook);
|
||||
if (legacyUrl) {
|
||||
return { url: legacyUrl, source: "legacy" };
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function buildCronWebhookHeaders(webhookToken?: string): Record<string, string> {
|
||||
const headers: Record<string, string> = {
|
||||
"Content-Type": "application/json",
|
||||
};
|
||||
if (webhookToken) {
|
||||
headers.Authorization = `Bearer ${webhookToken}`;
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
|
||||
async function postCronWebhook(params: {
|
||||
webhookUrl: string;
|
||||
webhookToken?: string;
|
||||
payload: unknown;
|
||||
logContext: Record<string, unknown>;
|
||||
blockedLog: string;
|
||||
failedLog: string;
|
||||
logger: ReturnType<typeof getChildLogger>;
|
||||
}): Promise<void> {
|
||||
const abortController = new AbortController();
|
||||
const timeout = setTimeout(() => {
|
||||
abortController.abort();
|
||||
}, CRON_WEBHOOK_TIMEOUT_MS);
|
||||
|
||||
try {
|
||||
const result = await fetchWithSsrFGuard({
|
||||
url: params.webhookUrl,
|
||||
init: {
|
||||
method: "POST",
|
||||
headers: buildCronWebhookHeaders(params.webhookToken),
|
||||
body: JSON.stringify(params.payload),
|
||||
signal: abortController.signal,
|
||||
},
|
||||
});
|
||||
await result.release();
|
||||
} catch (err) {
|
||||
if (err instanceof SsrFBlockedError) {
|
||||
params.logger.warn(
|
||||
{
|
||||
...params.logContext,
|
||||
reason: formatErrorMessage(err),
|
||||
webhookUrl: redactWebhookUrl(params.webhookUrl),
|
||||
},
|
||||
params.blockedLog,
|
||||
);
|
||||
} else {
|
||||
params.logger.warn(
|
||||
{
|
||||
...params.logContext,
|
||||
err: formatErrorMessage(err),
|
||||
webhookUrl: redactWebhookUrl(params.webhookUrl),
|
||||
},
|
||||
params.failedLog,
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
export function buildGatewayCronService(params: {
|
||||
cfg: OpenClawConfig;
|
||||
deps: CliDeps;
|
||||
@@ -355,207 +242,36 @@ export function buildGatewayCronService(params: {
|
||||
});
|
||||
}
|
||||
},
|
||||
sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => {
|
||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
||||
const webhookToken = normalizeOptionalString(params.cfg.cron?.webhookToken);
|
||||
|
||||
// Webhook mode requires a URL - fail closed if missing
|
||||
if (mode === "webhook" && !to) {
|
||||
cronLogger.warn(
|
||||
{ jobId: job.id },
|
||||
"cron: failure alert webhook mode requires URL, skipping",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mode === "webhook" && to) {
|
||||
const webhookUrl = normalizeHttpWebhookUrl(to);
|
||||
if (webhookUrl) {
|
||||
await postCronWebhook({
|
||||
webhookUrl,
|
||||
webhookToken,
|
||||
payload: {
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
message: text,
|
||||
},
|
||||
logContext: { jobId: job.id },
|
||||
blockedLog: "cron: failure alert webhook blocked by SSRF guard",
|
||||
failedLog: "cron: failure alert webhook failed",
|
||||
logger: cronLogger,
|
||||
});
|
||||
} else {
|
||||
cronLogger.warn(
|
||||
{
|
||||
jobId: job.id,
|
||||
webhookUrl: redactWebhookUrl(to),
|
||||
},
|
||||
"cron: failure alert webhook URL is invalid, skipping",
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const deliverySessionKey = resolveCronDeliverySessionKey(job);
|
||||
const target = await resolveDeliveryTarget(runtimeConfig, agentId, {
|
||||
sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) =>
|
||||
await sendGatewayCronFailureAlert({
|
||||
deps: params.deps,
|
||||
logger: cronLogger,
|
||||
resolveCronAgent,
|
||||
webhookToken: params.cfg.cron?.webhookToken,
|
||||
job,
|
||||
text,
|
||||
channel,
|
||||
to,
|
||||
mode,
|
||||
accountId,
|
||||
sessionKey: deliverySessionKey,
|
||||
});
|
||||
if (!target.ok) {
|
||||
throw target.error;
|
||||
}
|
||||
const session = buildOutboundSessionContext({
|
||||
cfg: runtimeConfig,
|
||||
agentId,
|
||||
sessionKey: deliverySessionKey ?? `cron:${job.id}:failure`,
|
||||
});
|
||||
await deliverOutboundPayloads({
|
||||
cfg: runtimeConfig,
|
||||
channel: target.channel,
|
||||
to: target.to,
|
||||
accountId: target.accountId,
|
||||
threadId: target.threadId,
|
||||
payloads: [{ text }],
|
||||
deps: createOutboundSendDeps(params.deps),
|
||||
session,
|
||||
});
|
||||
},
|
||||
}),
|
||||
log: getChildLogger({ module: "cron", storePath }),
|
||||
onEvent: (evt) => {
|
||||
params.broadcast("cron", evt, { dropIfSlow: true });
|
||||
if (evt.action === "finished") {
|
||||
const webhookToken = normalizeOptionalString(params.cfg.cron?.webhookToken);
|
||||
const legacyWebhook = normalizeOptionalString(params.cfg.cron?.webhook);
|
||||
const job = cron.getJob(evt.jobId);
|
||||
const legacyNotify = (job as { notify?: unknown } | undefined)?.notify === true;
|
||||
const webhookTarget = resolveCronWebhookTarget({
|
||||
delivery:
|
||||
job?.delivery && typeof job.delivery.mode === "string"
|
||||
? { mode: job.delivery.mode, to: job.delivery.to }
|
||||
: undefined,
|
||||
legacyNotify,
|
||||
legacyWebhook,
|
||||
dispatchGatewayCronFinishedNotifications({
|
||||
evt,
|
||||
job,
|
||||
deps: params.deps,
|
||||
logger: cronLogger,
|
||||
resolveCronAgent,
|
||||
webhookToken: params.cfg.cron?.webhookToken,
|
||||
legacyWebhook: params.cfg.cron?.webhook,
|
||||
globalFailureDestination: params.cfg.cron?.failureDestination,
|
||||
warnedLegacyWebhookJobs,
|
||||
});
|
||||
|
||||
if (!webhookTarget && job?.delivery?.mode === "webhook") {
|
||||
cronLogger.warn(
|
||||
{
|
||||
jobId: evt.jobId,
|
||||
deliveryTo: job.delivery.to,
|
||||
},
|
||||
"cron: skipped webhook delivery, delivery.to must be a valid http(s) URL",
|
||||
);
|
||||
}
|
||||
|
||||
if (webhookTarget?.source === "legacy" && !warnedLegacyWebhookJobs.has(evt.jobId)) {
|
||||
warnedLegacyWebhookJobs.add(evt.jobId);
|
||||
cronLogger.warn(
|
||||
{
|
||||
jobId: evt.jobId,
|
||||
legacyWebhook: redactWebhookUrl(webhookTarget.url),
|
||||
},
|
||||
"cron: deprecated notify+cron.webhook fallback in use, migrate to delivery.mode=webhook with delivery.to",
|
||||
);
|
||||
}
|
||||
|
||||
if (webhookTarget && evt.summary) {
|
||||
void (async () => {
|
||||
await postCronWebhook({
|
||||
webhookUrl: webhookTarget.url,
|
||||
webhookToken,
|
||||
payload: evt,
|
||||
logContext: { jobId: evt.jobId },
|
||||
blockedLog: "cron: webhook delivery blocked by SSRF guard",
|
||||
failedLog: "cron: webhook delivery failed",
|
||||
logger: cronLogger,
|
||||
});
|
||||
})();
|
||||
}
|
||||
|
||||
if (evt.status === "error" && job) {
|
||||
const isBestEffort = job.delivery?.bestEffort === true;
|
||||
if (!isBestEffort) {
|
||||
const failureMessage = `Cron job "${job.name}" failed: ${evt.error ?? "unknown error"}`;
|
||||
const failureDest = resolveFailureDestination(job, params.cfg.cron?.failureDestination);
|
||||
const deliverySessionKey = resolveCronDeliverySessionKey(job);
|
||||
|
||||
if (failureDest) {
|
||||
// Explicit failureDestination configured — use it
|
||||
const failurePayload = {
|
||||
jobId: job.id,
|
||||
jobName: job.name,
|
||||
message: failureMessage,
|
||||
status: evt.status,
|
||||
error: evt.error,
|
||||
runAtMs: evt.runAtMs,
|
||||
durationMs: evt.durationMs,
|
||||
nextRunAtMs: evt.nextRunAtMs,
|
||||
};
|
||||
|
||||
if (failureDest.mode === "webhook" && failureDest.to) {
|
||||
const webhookUrl = normalizeHttpWebhookUrl(failureDest.to);
|
||||
if (webhookUrl) {
|
||||
void (async () => {
|
||||
await postCronWebhook({
|
||||
webhookUrl,
|
||||
webhookToken,
|
||||
payload: failurePayload,
|
||||
logContext: { jobId: evt.jobId },
|
||||
blockedLog: "cron: failure destination webhook blocked by SSRF guard",
|
||||
failedLog: "cron: failure destination webhook failed",
|
||||
logger: cronLogger,
|
||||
});
|
||||
})();
|
||||
} else {
|
||||
cronLogger.warn(
|
||||
{
|
||||
jobId: evt.jobId,
|
||||
webhookUrl: redactWebhookUrl(failureDest.to),
|
||||
},
|
||||
"cron: failure destination webhook URL is invalid, skipping",
|
||||
);
|
||||
}
|
||||
} else if (failureDest.mode === "announce") {
|
||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
||||
void sendFailureNotificationAnnounce(
|
||||
params.deps,
|
||||
runtimeConfig,
|
||||
agentId,
|
||||
job.id,
|
||||
{
|
||||
channel: failureDest.channel,
|
||||
to: failureDest.to,
|
||||
accountId: failureDest.accountId,
|
||||
sessionKey: deliverySessionKey,
|
||||
},
|
||||
`⚠️ ${failureMessage}`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// No explicit failureDestination — fall back to primary delivery channel (#60608)
|
||||
const primaryPlan = resolveCronDeliveryPlan(job);
|
||||
if (primaryPlan.mode === "announce" && primaryPlan.requested) {
|
||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
||||
void sendFailureNotificationAnnounce(
|
||||
params.deps,
|
||||
runtimeConfig,
|
||||
agentId,
|
||||
job.id,
|
||||
{
|
||||
channel: primaryPlan.channel,
|
||||
to: primaryPlan.to,
|
||||
accountId: primaryPlan.accountId,
|
||||
sessionKey: deliverySessionKey,
|
||||
},
|
||||
`⚠️ ${failureMessage}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const logPath = resolveCronRunLogPath({
|
||||
storePath,
|
||||
jobId: evt.jobId,
|
||||
|
||||
Reference in New Issue
Block a user