fix(cron): resolve delivery preview server-side

This commit is contained in:
Ayaan Zaidi
2026-04-21 11:17:02 +05:30
parent 4f0a978fc2
commit 0b25a73288
10 changed files with 231 additions and 111 deletions

View File

@@ -12,11 +12,11 @@ import { parsePositiveIntOrUndefined } from "../program/helpers.js";
import { resolveCronCreateSchedule } from "./schedule-options.js";
import {
getCronChannelOptions,
coerceCronDeliveryPreviews,
handleCronCliError,
parseCronToolsAllow,
printCronJson,
printCronList,
resolveCronDeliveryPreviews,
warnIfCronSchedulerDisabled,
} from "./shared.js";
@@ -54,7 +54,7 @@ export function registerCronListCommand(cron: Command) {
return;
}
const jobs = (res as { jobs?: CronJob[] } | null)?.jobs ?? [];
const deliveryPreviews = await resolveCronDeliveryPreviews(jobs);
const deliveryPreviews = coerceCronDeliveryPreviews(res);
printCronList(jobs, defaultRuntime, { deliveryPreviews });
} catch (err) {
handleCronCliError(err);

View File

@@ -4,6 +4,7 @@ import { defaultRuntime } from "../../runtime.js";
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js";
import { addGatewayClientOptions, callGatewayFromCli } from "../gateway-rpc.js";
import {
coerceCronDeliveryPreviews,
handleCronCliError,
printCronJson,
printCronShow,
@@ -95,7 +96,8 @@ export function registerCronSimpleCommands(cron: Command) {
printCronJson(job);
return;
}
await printCronShow(job, defaultRuntime);
const deliveryPreviews = coerceCronDeliveryPreviews(res);
printCronShow(job, defaultRuntime, { deliveryPreview: deliveryPreviews.get(job.id) });
} catch (err) {
handleCronCliError(err);
}

View File

@@ -1,7 +1,12 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CronJob } from "../../cron/types.js";
import type { RuntimeEnv } from "../../runtime.js";
import { getCronChannelOptions, parseCronToolsAllow, printCronList } from "./shared.js";
import {
coerceCronDeliveryPreviews,
getCronChannelOptions,
parseCronToolsAllow,
printCronList,
} from "./shared.js";
const hoisted = vi.hoisted(() => ({
listChannelPluginsMock: vi.fn(),
@@ -234,3 +239,25 @@ describe("parseCronToolsAllow", () => {
expect(parseCronToolsAllow(" , ")).toBeUndefined();
});
});
describe("coerceCronDeliveryPreviews", () => {
it("keeps gateway-provided preview entries", () => {
expect(
coerceCronDeliveryPreviews({
deliveryPreviews: {
job1: { label: "announce -> telegram:123", detail: "explicit" },
},
}).get("job1"),
).toEqual({ label: "announce -> telegram:123", detail: "explicit" });
});
it("drops malformed preview entries", () => {
expect(
coerceCronDeliveryPreviews({
deliveryPreviews: {
job1: { label: "announce -> telegram:123" },
},
}).size,
).toBe(0);
});
});

View File

@@ -1,7 +1,7 @@
import { listChannelPlugins } from "../../channels/plugins/index.js";
import { parseAbsoluteTimeMs } from "../../cron/parse.js";
import { resolveCronStaggerMs } from "../../cron/stagger.js";
import type { CronJob, CronSchedule } from "../../cron/types.js";
import type { CronDeliveryPreview, CronJob, CronSchedule } from "../../cron/types.js";
import { danger } from "../../globals.js";
import { formatDurationHuman } from "../../infra/format-time/format-duration.ts";
import {
@@ -225,99 +225,26 @@ const formatStatus = (job: CronJob) => {
return job.state.lastStatus ?? "idle";
};
export type CronDeliveryPreview = {
label: string;
detail: string;
};
function formatTarget(channel?: string, to?: string | null): string {
if (!channel) {
return "last";
export function coerceCronDeliveryPreviews(value: unknown): Map<string, CronDeliveryPreview> {
const previews =
value && typeof value === "object"
? (value as { deliveryPreviews?: unknown }).deliveryPreviews
: undefined;
if (!previews || typeof previews !== "object") {
return new Map();
}
if (to) {
return `${channel}:${to}`;
}
return channel;
}
function formatDeliveryDetail(params: {
requestedChannel?: string;
resolved: boolean;
sessionKey?: string;
error?: string;
}): string {
if (params.requestedChannel === "last" || !params.requestedChannel) {
if (!params.resolved) {
return params.error
? `last -> no route, will fail-closed: ${params.error}`
: "last -> no route, will fail-closed";
}
return params.sessionKey
? `resolved from last, session ${params.sessionKey}`
: "resolved from last, main session";
}
return params.resolved ? "explicit" : (params.error ?? "unresolved");
}
export async function resolveCronDeliveryPreview(job: CronJob): Promise<CronDeliveryPreview> {
const { resolveCronDeliveryPlan } = await import("../../cron/delivery-plan.js");
const plan = resolveCronDeliveryPlan(job);
if (!plan.requested && plan.mode === "none" && !job.delivery) {
return { label: "not requested", detail: "not requested" };
}
if (plan.mode === "webhook") {
const target = plan.to ? `webhook:${plan.to}` : "webhook";
return { label: target, detail: plan.to ? "webhook" : "webhook target missing" };
}
const requestedChannel = plan.channel ?? "last";
const [{ loadConfig }, { resolveDefaultAgentId }, { resolveDeliveryTarget }] = await Promise.all([
import("../../config/config.js"),
import("../../agents/agent-scope-config.js"),
import("../../cron/isolated-agent/delivery-target.js"),
]);
const cfg = loadConfig();
const agentId = job.agentId?.trim() || resolveDefaultAgentId(cfg);
const resolved = await resolveDeliveryTarget(
cfg,
agentId,
{
channel: requestedChannel,
to: plan.to,
threadId: plan.threadId,
accountId: plan.accountId,
sessionKey: job.sessionKey,
},
{ dryRun: true },
);
if (!resolved.ok) {
return {
label: `${plan.mode} -> ${formatTarget(requestedChannel, plan.to ?? null)}`,
detail: formatDeliveryDetail({
requestedChannel,
resolved: false,
sessionKey: job.sessionKey,
error: resolved.error.message,
}),
};
}
return {
label: `${plan.mode} -> ${formatTarget(resolved.channel, resolved.to)}`,
detail: formatDeliveryDetail({
requestedChannel,
resolved: true,
sessionKey: job.sessionKey,
return new Map(
Object.entries(previews as Record<string, unknown>).flatMap(([jobId, preview]) => {
if (!preview || typeof preview !== "object") {
return [];
}
const record = preview as { label?: unknown; detail?: unknown };
if (typeof record.label !== "string" || typeof record.detail !== "string") {
return [];
}
return [[jobId, { label: record.label, detail: record.detail }]];
}),
};
}
export async function resolveCronDeliveryPreviews(
jobs: CronJob[],
): Promise<Map<string, CronDeliveryPreview>> {
const entries = await Promise.all(
jobs.map(async (job) => [job.id, await resolveCronDeliveryPreview(job)] as const),
);
return new Map(entries);
}
export function printCronList(
@@ -421,8 +348,12 @@ export function printCronList(
}
}
export async function printCronShow(job: CronJob, runtime: RuntimeEnv = defaultRuntime) {
const preview = await resolveCronDeliveryPreview(job);
export function printCronShow(
job: CronJob,
runtime: RuntimeEnv = defaultRuntime,
opts?: { deliveryPreview?: CronDeliveryPreview },
) {
const preview = opts?.deliveryPreview ?? { label: "-", detail: "unavailable" };
runtime.log(`id: ${job.id}`);
runtime.log(`name: ${job.name}`);
runtime.log(`enabled: ${job.enabled ? "yes" : "no"}`);

View File

@@ -0,0 +1,105 @@
import { resolveDefaultAgentId } from "../agents/agent-scope-config.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveCronDeliveryPlan } from "./delivery-plan.js";
import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js";
import type { CronDeliveryPreview, CronJob } from "./types.js";
function formatTarget(channel?: string, to?: string | null): string {
if (!channel) {
return "last";
}
if (to) {
return `${channel}:${to}`;
}
return channel;
}
function formatDeliveryDetail(params: {
requestedChannel?: string;
resolved: boolean;
sessionKey?: string;
error?: string;
}): string {
if (params.requestedChannel === "last" || !params.requestedChannel) {
if (!params.resolved) {
return params.error
? `last -> no route, will fail-closed: ${params.error}`
: "last -> no route, will fail-closed";
}
return params.sessionKey
? `resolved from last, session ${params.sessionKey}`
: "resolved from last, main session";
}
return params.resolved ? "explicit" : (params.error ?? "unresolved");
}
export async function resolveCronDeliveryPreview(params: {
cfg: OpenClawConfig;
defaultAgentId?: string;
job: CronJob;
}): Promise<CronDeliveryPreview> {
const plan = resolveCronDeliveryPlan(params.job);
if (!plan.requested && plan.mode === "none" && !params.job.delivery) {
return { label: "not requested", detail: "not requested" };
}
if (plan.mode === "webhook") {
const target = plan.to ? `webhook:${plan.to}` : "webhook";
return { label: target, detail: plan.to ? "webhook" : "webhook target missing" };
}
const requestedChannel = plan.channel ?? "last";
const agentId =
params.job.agentId?.trim() || params.defaultAgentId || resolveDefaultAgentId(params.cfg);
const resolved = await resolveDeliveryTarget(
params.cfg,
agentId,
{
channel: requestedChannel,
to: plan.to,
threadId: plan.threadId,
accountId: plan.accountId,
sessionKey: params.job.sessionKey,
},
{ dryRun: true },
);
if (!resolved.ok) {
return {
label: `${plan.mode} -> ${formatTarget(requestedChannel, plan.to ?? null)}`,
detail: formatDeliveryDetail({
requestedChannel,
resolved: false,
sessionKey: params.job.sessionKey,
error: resolved.error.message,
}),
};
}
return {
label: `${plan.mode} -> ${formatTarget(resolved.channel, resolved.to)}`,
detail: formatDeliveryDetail({
requestedChannel,
resolved: true,
sessionKey: params.job.sessionKey,
}),
};
}
export async function resolveCronDeliveryPreviews(params: {
cfg: OpenClawConfig;
defaultAgentId?: string;
jobs: CronJob[];
}): Promise<Record<string, CronDeliveryPreview>> {
const entries = await Promise.all(
params.jobs.map(
async (job) =>
[
job.id,
await resolveCronDeliveryPreview({
cfg: params.cfg,
defaultAgentId: params.defaultAgentId,
job,
}),
] as const,
),
);
return Object.fromEntries(entries);
}

View File

@@ -8,6 +8,7 @@ import {
DEFAULT_CRON_RUN_LOG_MAX_BYTES,
getPendingCronRunLogWriteCountForTests,
readCronRunLogEntries,
readCronRunLogEntriesPage,
resolveCronRunLogPruneOptions,
resolveCronRunLogPath,
} from "./run-log.js";
@@ -237,6 +238,48 @@ describe("cron run log", () => {
});
});
it("does not include raw delivery targets in run-log search", async () => {
await withRunLogDir("openclaw-cron-log-target-query-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");
await fs.mkdir(path.dirname(logPath), { recursive: true });
await fs.writeFile(
logPath,
JSON.stringify({
ts: 2,
jobId: "job-1",
action: "finished",
status: "ok",
summary: "done",
delivery: {
intended: { channel: "last", to: null, source: "last" },
resolved: { ok: true, channel: "telegram", to: "-100", source: "last" },
messageToolSentTo: [{ channel: "telegram", to: "-100" }],
},
}) + "\n",
"utf-8",
);
expect(
(
await readCronRunLogEntriesPage(logPath, {
limit: 10,
jobId: "job-1",
query: "telegram",
})
).entries,
).toHaveLength(1);
expect(
(
await readCronRunLogEntriesPage(logPath, {
limit: 10,
jobId: "job-1",
query: "-100",
})
).entries,
).toEqual([]);
});
});
it("reads telemetry fields", async () => {
await withRunLogDir("openclaw-cron-log-telemetry-", async (dir) => {
const logPath = path.join(dir, "runs", "job-1.jsonl");

View File

@@ -390,13 +390,8 @@ export async function readCronRunLogEntriesPage(
entry.error ?? "",
entry.jobId,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.intended?.to ?? "",
entry.delivery?.resolved?.channel ?? "",
entry.delivery?.resolved?.to ?? "",
...(entry.delivery?.messageToolSentTo ?? []).flatMap((target) => [
target.channel,
target.to ?? "",
]),
...(entry.delivery?.messageToolSentTo ?? []).map((target) => target.channel),
].join(" "),
});
const sorted =
@@ -460,13 +455,8 @@ export async function readCronRunLogEntriesPageAll(
entry.jobId,
jobName,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.intended?.to ?? "",
entry.delivery?.resolved?.channel ?? "",
entry.delivery?.resolved?.to ?? "",
...(entry.delivery?.messageToolSentTo ?? []).flatMap((target) => [
target.channel,
target.to ?? "",
]),
...(entry.delivery?.messageToolSentTo ?? []).map((target) => target.channel),
].join(" ");
},
});

View File

@@ -69,6 +69,11 @@ export type CronDeliveryTrace = {
delivered?: boolean;
};
export type CronDeliveryPreview = {
label: string;
detail: string;
};
export type CronUsageSummary = {
input_tokens?: number;
output_tokens?: number;

View File

@@ -1,6 +1,7 @@
import { listPotentialConfiguredChannelIds } from "../../channels/config-presence.js";
import { loadConfig } from "../../config/config.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveCronDeliveryPreviews } from "../../cron/delivery-preview.js";
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
import {
readCronRunLogEntriesPage,
@@ -161,7 +162,12 @@ export const cronHandlers: GatewayRequestHandlers = {
sortBy: p.sortBy,
sortDir: p.sortDir,
});
respond(true, page, undefined);
const deliveryPreviews = await resolveCronDeliveryPreviews({
cfg: loadConfig(),
defaultAgentId: context.cron.getDefaultAgentId(),
jobs: page.jobs,
});
respond(true, { ...page, deliveryPreviews }, undefined);
},
"cron.status": async ({ params, respond, context }) => {
if (!validateCronStatusParams(params)) {

View File

@@ -275,7 +275,8 @@ describe("gateway server cron", () => {
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
});
expect(addRes.ok).toBe(true);
expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe("string");
const dailyJobId = (addRes.payload as { id?: unknown } | null)?.id;
expect(typeof dailyJobId).toBe("string");
const listRes = await rpcReq(ws, "cron.list", {
includeDisabled: true,
@@ -288,6 +289,16 @@ describe("gateway server cron", () => {
expect(
((jobs as Array<{ delivery?: { mode?: unknown } }>)[0]?.delivery?.mode as string) ?? "",
).toBe("webhook");
expect(
(
listRes.payload as {
deliveryPreviews?: Record<string, { label?: unknown; detail?: unknown }>;
} | null
)?.deliveryPreviews?.[String(dailyJobId)],
).toEqual({
label: "webhook:https://example.invalid/cron-finished",
detail: "webhook",
});
const routeAtMs = Date.now() - 1;
const routeRes = await rpcReq(ws, "cron.add", {