fix(cron): log delivery target trace

This commit is contained in:
Ayaan Zaidi
2026-04-21 10:26:26 +05:30
parent d083702a7b
commit 4c8299ca3d
11 changed files with 245 additions and 18 deletions

View File

@@ -382,7 +382,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
meta: { agentMeta: { usage: { input: 10, output: 20 } } },
});
await runCronIsolatedAgentTurn({
const result = await runCronIsolatedAgentTurn({
...params,
job: job as never,
});
@@ -394,6 +394,15 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
skipMessagingToolDelivery: true,
}),
);
expect(result.delivery).toEqual(
expect.objectContaining({
intended: { channel: "telegram", to: "123", source: "explicit" },
resolved: { ok: true, channel: "telegram", to: "123", source: "explicit" },
messageToolSentTo: [{ channel: "telegram", to: "123" }],
fallbackUsed: false,
delivered: true,
}),
);
});
it("does not mark message tool delivery as matched when cron target resolution failed", async () => {
@@ -419,7 +428,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
meta: { agentMeta: { usage: { input: 10, output: 20 } } },
});
await runCronIsolatedAgentTurn(makeParams());
const result = await runCronIsolatedAgentTurn(makeParams());
expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1);
expect(dispatchCronDeliveryMock.mock.calls[0]?.[0]).toEqual(
@@ -429,6 +438,19 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
unverifiedMessagingToolDelivery: true,
}),
);
expect(result.delivery).toEqual(
expect.objectContaining({
intended: { channel: "last", to: null, source: "last" },
resolved: expect.objectContaining({
ok: false,
source: "last",
error: "sessionKey is required to resolve delivery.channel=last",
}),
messageToolSentTo: [{ channel: "telegram", to: "123" }],
fallbackUsed: false,
delivered: false,
}),
);
});
it("marks no-deliver runs delivered when the message tool sends to the current target", async () => {

View File

@@ -371,9 +371,12 @@ function resetRunOutcomeMocks(): void {
resolveCronDeliveryPlanMock.mockReturnValue({ requested: false, mode: "none" });
resolveDeliveryTargetMock.mockReset();
resolveDeliveryTargetMock.mockResolvedValue({
ok: true,
channel: "discord",
to: undefined,
to: "test-target",
accountId: undefined,
threadId: undefined,
mode: "explicit",
error: undefined,
});
dispatchCronDeliveryMock.mockReset();

View File

@@ -1,11 +1,18 @@
import { hasAnyAuthProfileStoreSource } from "../../agents/auth-profiles/source-check.js";
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js";
import type { SkillSnapshot } from "../../agents/skills.js";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import type { CliDeps } from "../../cli/outbound-send-deps.js";
import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
import type { CronJob, CronRunTelemetry } from "../types.js";
import { resolveCronDeliveryPlan, type CronDeliveryPlan } from "../delivery-plan.js";
import type {
CronDeliveryTrace,
CronDeliveryTraceMessageTarget,
CronDeliveryTraceTarget,
CronJob,
CronRunTelemetry,
} from "../types.js";
import {
isHeartbeatOnlyResponse,
resolveCronPayloadOutcome,
@@ -120,6 +127,85 @@ type CronModelCatalogRuntime = typeof import("./run-model-catalog.runtime.js");
type CronDeliveryRuntime = typeof import("./run-delivery.runtime.js");
type ResolvedCronDeliveryTarget = Awaited<ReturnType<CronDeliveryRuntime["resolveDeliveryTarget"]>>;
function normalizeCronTraceTarget(
target: CronDeliveryTraceTarget | undefined,
): CronDeliveryTraceTarget | undefined {
if (!target) {
return undefined;
}
return {
...(target.channel ? { channel: target.channel } : {}),
...(target.to !== undefined ? { to: target.to } : {}),
...(target.accountId ? { accountId: target.accountId } : {}),
...(target.threadId !== undefined ? { threadId: target.threadId } : {}),
...(target.source ? { source: target.source } : {}),
};
}
function normalizeMessagingToolTarget(
target: MessagingToolSend,
): CronDeliveryTraceMessageTarget | undefined {
const channel = target.provider?.trim();
if (!channel) {
return undefined;
}
return {
channel,
...(target.to ? { to: target.to } : {}),
...(target.accountId ? { accountId: target.accountId } : {}),
...(target.threadId ? { threadId: target.threadId } : {}),
};
}
function buildCronDeliveryTrace(params: {
deliveryPlan: CronDeliveryPlan;
resolvedDelivery: ResolvedCronDeliveryTarget;
messagingToolSentTargets: MessagingToolSend[];
fallbackUsed: boolean;
delivered: boolean;
}): CronDeliveryTrace {
const intended = normalizeCronTraceTarget({
channel: params.deliveryPlan.channel ?? "last",
to: params.deliveryPlan.to ?? null,
accountId: params.deliveryPlan.accountId,
threadId: params.deliveryPlan.threadId,
source:
params.deliveryPlan.channel === "last" || !params.deliveryPlan.channel ? "last" : "explicit",
});
const resolved = params.resolvedDelivery.ok
? {
ok: true,
...normalizeCronTraceTarget({
channel: params.resolvedDelivery.channel,
to: params.resolvedDelivery.to,
accountId: params.resolvedDelivery.accountId,
threadId: params.resolvedDelivery.threadId,
source: params.resolvedDelivery.mode === "implicit" ? "last" : "explicit",
}),
}
: {
ok: false,
...normalizeCronTraceTarget({
channel: params.resolvedDelivery.channel,
to: params.resolvedDelivery.to ?? null,
accountId: params.resolvedDelivery.accountId,
threadId: params.resolvedDelivery.threadId,
source: params.resolvedDelivery.mode === "implicit" ? "last" : "explicit",
}),
error: params.resolvedDelivery.error.message,
};
const messageToolSentTo = params.messagingToolSentTargets
.map((target) => normalizeMessagingToolTarget(target))
.filter((target): target is CronDeliveryTraceMessageTarget => Boolean(target));
return {
...(intended ? { intended } : {}),
resolved,
...(messageToolSentTo.length > 0 ? { messageToolSentTo } : {}),
fallbackUsed: params.fallbackUsed,
delivered: params.delivered,
};
}
function resolveCronToolPolicy(params: { deliveryMode: "announce" | "webhook" | "none" }) {
const enableMessageTool = params.deliveryMode !== "webhook";
return {
@@ -233,6 +319,7 @@ type PreparedCronRunContext = {
persistSessionEntry: PersistCronSessionEntry;
withRunSession: WithRunSession;
agentPayload: Extract<CronJob["payload"], { kind: "agentTurn" }> | null;
deliveryPlan: CronDeliveryPlan;
resolvedDelivery: ResolvedCronDeliveryTarget;
deliveryRequested: boolean;
toolPolicy: ReturnType<typeof resolveCronToolPolicy>;
@@ -403,11 +490,12 @@ async function prepareCronRunContext(params: {
input.job.payload.kind === "agentTurn" ? input.job.payload.timeoutSeconds : undefined,
});
const agentPayload = input.job.payload.kind === "agentTurn" ? input.job.payload : null;
const { deliveryRequested, resolvedDelivery, toolPolicy } = await resolveCronDeliveryContext({
cfg: cfgWithAgentDefaults,
job: input.job,
agentId,
});
const { deliveryPlan, deliveryRequested, resolvedDelivery, toolPolicy } =
await resolveCronDeliveryContext({
cfg: cfgWithAgentDefaults,
job: input.job,
agentId,
});
const { formattedTime, timeLine } = resolveCronStyleNow(input.cfg, now);
const base = `[cron:${input.job.id} ${input.job.name}] ${input.message}`.trim();
@@ -518,6 +606,7 @@ async function prepareCronRunContext(params: {
persistSessionEntry,
withRunSession,
agentPayload,
deliveryPlan,
resolvedDelivery,
deliveryRequested,
toolPolicy,
@@ -641,7 +730,11 @@ async function finalizeCronRun(params: {
finalAssistantVisibleText: finalRunResult.meta?.finalAssistantVisibleText,
preferFinalAssistantVisibleText: prepared.resolvedDelivery.channel === "telegram",
});
const resolveRunOutcome = (result?: { delivered?: boolean; deliveryAttempted?: boolean }) =>
const resolveRunOutcome = (result?: {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}) =>
prepared.withRunSession({
status: hasFatalErrorPayload ? "error" : "ok",
...(hasFatalErrorPayload
@@ -651,6 +744,7 @@ async function finalizeCronRun(params: {
outputText,
delivered: result?.delivered,
deliveryAttempted: result?.deliveryAttempted,
delivery: result?.delivery,
...telemetry,
});
@@ -702,11 +796,19 @@ async function finalizeCronRun(params: {
abortReason: params.abortReason,
withRunSession: prepared.withRunSession,
});
const deliveryTrace = buildCronDeliveryTrace({
deliveryPlan: prepared.deliveryPlan,
resolvedDelivery: prepared.resolvedDelivery,
messagingToolSentTargets,
fallbackUsed: deliveryResult.deliveryAttempted && !skipMessagingToolDelivery,
delivered: deliveryResult.delivered,
});
if (deliveryResult.result) {
const resultWithDeliveryMeta: RunCronAgentTurnResult = {
...deliveryResult.result,
deliveryAttempted:
deliveryResult.result.deliveryAttempted ?? deliveryResult.deliveryAttempted,
delivery: deliveryTrace,
};
if (!hasFatalErrorPayload || deliveryResult.result.status !== "ok") {
return resultWithDeliveryMeta;
@@ -714,6 +816,7 @@ async function finalizeCronRun(params: {
return resolveRunOutcome({
delivered: deliveryResult.result.delivered,
deliveryAttempted: resultWithDeliveryMeta.deliveryAttempted,
delivery: deliveryTrace,
});
}
summary = deliveryResult.summary;
@@ -721,6 +824,7 @@ async function finalizeCronRun(params: {
return resolveRunOutcome({
delivered: deliveryResult.delivered,
deliveryAttempted: deliveryResult.deliveryAttempted,
delivery: deliveryTrace,
});
}

View File

@@ -1,4 +1,4 @@
import type { CronRunOutcome, CronRunTelemetry } from "../types.js";
import type { CronDeliveryTrace, CronRunOutcome, CronRunTelemetry } from "../types.js";
export type RunCronAgentTurnResult = {
/** Last non-empty agent text output (not truncated). */
@@ -16,5 +16,6 @@ export type RunCronAgentTurnResult = {
* cannot guarantee a final delivery ack synchronously.
*/
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
} & CronRunOutcome &
CronRunTelemetry;

View File

@@ -209,6 +209,13 @@ describe("cron run log", () => {
delivered: true,
deliveryStatus: "not-delivered",
deliveryError: "announce failed",
delivery: {
intended: { channel: "last", to: null, source: "last" },
resolved: { ok: true, channel: "telegram", to: "-100", source: "last" },
messageToolSentTo: [{ channel: "telegram", to: "-100" }],
fallbackUsed: false,
delivered: true,
},
}),
].join("\n") + "\n",
"utf-8",
@@ -220,6 +227,13 @@ describe("cron run log", () => {
expect(entries[0]?.delivered).toBe(true);
expect(entries[0]?.deliveryStatus).toBe("not-delivered");
expect(entries[0]?.deliveryError).toBe("announce failed");
expect(entries[0]?.delivery).toEqual({
intended: { channel: "last", to: null, source: "last" },
resolved: { ok: true, channel: "telegram", to: "-100", source: "last" },
messageToolSentTo: [{ channel: "telegram", to: "-100" }],
fallbackUsed: false,
delivered: true,
});
});
});

View File

@@ -8,7 +8,12 @@ import {
normalizeOptionalString,
normalizeStringifiedOptionalString,
} from "../shared/string-coerce.js";
import type { CronDeliveryStatus, CronRunStatus, CronRunTelemetry } from "./types.js";
import type {
CronDeliveryStatus,
CronDeliveryTrace,
CronRunStatus,
CronRunTelemetry,
} from "./types.js";
export type CronRunLogEntry = {
ts: number;
@@ -20,6 +25,7 @@ export type CronRunLogEntry = {
delivered?: boolean;
deliveryStatus?: CronDeliveryStatus;
deliveryError?: string;
delivery?: CronDeliveryTrace;
sessionId?: string;
sessionKey?: string;
runAtMs?: number;
@@ -319,6 +325,9 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL
if (typeof obj.deliveryError === "string") {
entry.deliveryError = obj.deliveryError;
}
if (obj.delivery && typeof obj.delivery === "object") {
entry.delivery = obj.delivery;
}
if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) {
entry.sessionId = obj.sessionId;
}
@@ -375,7 +384,20 @@ export async function readCronRunLogEntriesPage(
statuses,
deliveryStatuses,
query,
queryTextForEntry: (entry) => [entry.summary ?? "", entry.error ?? "", entry.jobId].join(" "),
queryTextForEntry: (entry) =>
[
entry.summary ?? "",
entry.error ?? "",
entry.jobId,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.intended?.to ?? "",
entry.delivery?.resolved?.channel ?? "",
entry.delivery?.resolved?.to ?? "",
...(entry.delivery?.messageToolSentTo ?? []).flatMap((target) => [
target.channel,
target.to ?? "",
]),
].join(" "),
});
const sorted =
sortDir === "asc"
@@ -432,7 +454,20 @@ export async function readCronRunLogEntriesPageAll(
query,
queryTextForEntry: (entry) => {
const jobName = opts.jobNameById?.[entry.jobId] ?? "";
return [entry.summary ?? "", entry.error ?? "", entry.jobId, jobName].join(" ");
return [
entry.summary ?? "",
entry.error ?? "",
entry.jobId,
jobName,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.intended?.to ?? "",
entry.delivery?.resolved?.channel ?? "",
entry.delivery?.resolved?.to ?? "",
...(entry.delivery?.messageToolSentTo ?? []).flatMap((target) => [
target.channel,
target.to ?? "",
]),
].join(" ");
},
});
const sorted =

View File

@@ -643,6 +643,7 @@ async function finishPreparedManualRun(
delivered: coreResult.delivered,
deliveryStatus: job.state.lastDeliveryStatus,
deliveryError: job.state.lastDeliveryError,
delivery: coreResult.delivery,
sessionId: coreResult.sessionId,
sessionKey: coreResult.sessionKey,
runAtMs: startedAt,

View File

@@ -2,6 +2,7 @@ import type { CronConfig } from "../../config/types.cron.js";
import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js";
import type {
CronDeliveryStatus,
CronDeliveryTrace,
CronJob,
CronJobCreate,
CronJobPatch,
@@ -23,6 +24,7 @@ export type CronEvent = {
delivered?: boolean;
deliveryStatus?: CronDeliveryStatus;
deliveryError?: string;
delivery?: CronDeliveryTrace;
sessionId?: string;
sessionKey?: string;
nextRunAtMs?: number;
@@ -100,6 +102,7 @@ export type CronServiceDeps = {
* if the final per-message ack status is uncertain.
*/
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
} & CronRunOutcome &
CronRunTelemetry
>;

View File

@@ -14,6 +14,7 @@ import { createCronExecutionId } from "../run-id.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronDeliveryStatus,
CronDeliveryTrace,
CronJob,
CronMessageChannel,
CronRunOutcome,
@@ -1134,7 +1135,12 @@ export async function executeJobCore(
job: CronJob,
abortSignal?: AbortSignal,
): Promise<
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
CronRunOutcome &
CronRunTelemetry & {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}
> {
const resolveAbortError = () => ({
status: "error" as const,
@@ -1178,7 +1184,12 @@ async function executeMainSessionCronJob(
abortSignal: AbortSignal | undefined,
waitWithAbort: (ms: number) => Promise<void>,
): Promise<
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
CronRunOutcome &
CronRunTelemetry & {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}
> {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
@@ -1275,7 +1286,12 @@ async function executeDetachedCronJob(
abortSignal: AbortSignal | undefined,
resolveAbortError: () => { status: "error"; error: string },
): Promise<
CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean }
CronRunOutcome &
CronRunTelemetry & {
delivered?: boolean;
deliveryAttempted?: boolean;
delivery?: CronDeliveryTrace;
}
> {
if (job.payload.kind !== "agentTurn") {
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
@@ -1300,6 +1316,7 @@ async function executeDetachedCronJob(
summary: res.summary,
delivered: res.delivered,
deliveryAttempted: res.deliveryAttempted,
delivery: res.delivery,
sessionId: res.sessionId,
sessionKey: res.sessionKey,
model: res.model,
@@ -1330,6 +1347,7 @@ export async function executeJob(
let coreResult: {
status: CronRunStatus;
delivered?: boolean;
delivery?: CronDeliveryTrace;
} & CronRunOutcome &
CronRunTelemetry;
try {
@@ -1362,6 +1380,7 @@ function emitJobFinished(
result: {
status: CronRunStatus;
delivered?: boolean;
delivery?: CronDeliveryTrace;
} & CronRunOutcome &
CronRunTelemetry,
runAtMs: number,
@@ -1375,6 +1394,7 @@ function emitJobFinished(
delivered: result.delivered,
deliveryStatus: job.state.lastDeliveryStatus,
deliveryError: job.state.lastDeliveryError,
delivery: result.delivery,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
runAtMs,

View File

@@ -46,6 +46,29 @@ export type CronDeliveryPatch = Partial<CronDelivery>;
export type CronRunStatus = "ok" | "error" | "skipped";
export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested";
export type CronDeliveryTraceTarget = {
channel?: string;
to?: string | null;
accountId?: string;
threadId?: string | number;
source?: "explicit" | "last";
};
export type CronDeliveryTraceMessageTarget = {
channel: string;
to?: string;
accountId?: string;
threadId?: string;
};
export type CronDeliveryTrace = {
intended?: CronDeliveryTraceTarget;
resolved?: CronDeliveryTraceTarget & { ok: boolean; error?: string };
messageToolSentTo?: CronDeliveryTraceMessageTarget[];
fallbackUsed?: boolean;
delivered?: boolean;
};
export type CronUsageSummary = {
input_tokens?: number;
output_tokens?: number;

View File

@@ -562,6 +562,7 @@ export function buildGatewayCronService(params: {
delivered: evt.delivered,
deliveryStatus: evt.deliveryStatus,
deliveryError: evt.deliveryError,
delivery: evt.delivery,
sessionId: evt.sessionId,
sessionKey: evt.sessionKey,
runAtMs: evt.runAtMs,