From 4c8299ca3d4f719e84e55e11f5c6c7b452ceb392 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Tue, 21 Apr 2026 10:26:26 +0530 Subject: [PATCH] fix(cron): log delivery target trace --- .../run.message-tool-policy.test.ts | 26 +++- src/cron/isolated-agent/run.test-harness.ts | 5 +- src/cron/isolated-agent/run.ts | 120 ++++++++++++++++-- src/cron/isolated-agent/run.types.ts | 3 +- src/cron/run-log.test.ts | 14 ++ src/cron/run-log.ts | 41 +++++- src/cron/service/ops.ts | 1 + src/cron/service/state.ts | 3 + src/cron/service/timer.ts | 26 +++- src/cron/types.ts | 23 ++++ src/gateway/server-cron.ts | 1 + 11 files changed, 245 insertions(+), 18 deletions(-) diff --git a/src/cron/isolated-agent/run.message-tool-policy.test.ts b/src/cron/isolated-agent/run.message-tool-policy.test.ts index 4f62bc9bd92..e4487991e00 100644 --- a/src/cron/isolated-agent/run.message-tool-policy.test.ts +++ b/src/cron/isolated-agent/run.message-tool-policy.test.ts @@ -382,7 +382,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { meta: { agentMeta: { usage: { input: 10, output: 20 } } }, }); - await runCronIsolatedAgentTurn({ + const result = await runCronIsolatedAgentTurn({ ...params, job: job as never, }); @@ -394,6 +394,15 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { skipMessagingToolDelivery: true, }), ); + expect(result.delivery).toEqual( + expect.objectContaining({ + intended: { channel: "telegram", to: "123", source: "explicit" }, + resolved: { ok: true, channel: "telegram", to: "123", source: "explicit" }, + messageToolSentTo: [{ channel: "telegram", to: "123" }], + fallbackUsed: false, + delivered: true, + }), + ); }); it("does not mark message tool delivery as matched when cron target resolution failed", async () => { @@ -419,7 +428,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { meta: { agentMeta: { usage: { input: 10, output: 20 } } }, }); - await runCronIsolatedAgentTurn(makeParams()); + const result = await runCronIsolatedAgentTurn(makeParams()); expect(dispatchCronDeliveryMock).toHaveBeenCalledTimes(1); expect(dispatchCronDeliveryMock.mock.calls[0]?.[0]).toEqual( @@ -429,6 +438,19 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { unverifiedMessagingToolDelivery: true, }), ); + expect(result.delivery).toEqual( + expect.objectContaining({ + intended: { channel: "last", to: null, source: "last" }, + resolved: expect.objectContaining({ + ok: false, + source: "last", + error: "sessionKey is required to resolve delivery.channel=last", + }), + messageToolSentTo: [{ channel: "telegram", to: "123" }], + fallbackUsed: false, + delivered: false, + }), + ); }); it("marks no-deliver runs delivered when the message tool sends to the current target", async () => { diff --git a/src/cron/isolated-agent/run.test-harness.ts b/src/cron/isolated-agent/run.test-harness.ts index 31c28312ba9..51a459e0186 100644 --- a/src/cron/isolated-agent/run.test-harness.ts +++ b/src/cron/isolated-agent/run.test-harness.ts @@ -371,9 +371,12 @@ function resetRunOutcomeMocks(): void { resolveCronDeliveryPlanMock.mockReturnValue({ requested: false, mode: "none" }); resolveDeliveryTargetMock.mockReset(); resolveDeliveryTargetMock.mockResolvedValue({ + ok: true, channel: "discord", - to: undefined, + to: "test-target", accountId: undefined, + threadId: undefined, + mode: "explicit", error: undefined, }); dispatchCronDeliveryMock.mockReset(); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index ca1d6aaba5f..c8b8014451e 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -1,11 +1,18 @@ import { hasAnyAuthProfileStoreSource } from "../../agents/auth-profiles/source-check.js"; +import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.types.js"; import type { SkillSnapshot } from "../../agents/skills.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { CliDeps } from "../../cli/outbound-send-deps.js"; import type { AgentDefaultsConfig } from "../../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; -import { resolveCronDeliveryPlan } from "../delivery-plan.js"; -import type { CronJob, CronRunTelemetry } from "../types.js"; +import { resolveCronDeliveryPlan, type CronDeliveryPlan } from "../delivery-plan.js"; +import type { + CronDeliveryTrace, + CronDeliveryTraceMessageTarget, + CronDeliveryTraceTarget, + CronJob, + CronRunTelemetry, +} from "../types.js"; import { isHeartbeatOnlyResponse, resolveCronPayloadOutcome, @@ -120,6 +127,85 @@ type CronModelCatalogRuntime = typeof import("./run-model-catalog.runtime.js"); type CronDeliveryRuntime = typeof import("./run-delivery.runtime.js"); type ResolvedCronDeliveryTarget = Awaited>; +function normalizeCronTraceTarget( + target: CronDeliveryTraceTarget | undefined, +): CronDeliveryTraceTarget | undefined { + if (!target) { + return undefined; + } + return { + ...(target.channel ? { channel: target.channel } : {}), + ...(target.to !== undefined ? { to: target.to } : {}), + ...(target.accountId ? { accountId: target.accountId } : {}), + ...(target.threadId !== undefined ? { threadId: target.threadId } : {}), + ...(target.source ? { source: target.source } : {}), + }; +} + +function normalizeMessagingToolTarget( + target: MessagingToolSend, +): CronDeliveryTraceMessageTarget | undefined { + const channel = target.provider?.trim(); + if (!channel) { + return undefined; + } + return { + channel, + ...(target.to ? { to: target.to } : {}), + ...(target.accountId ? { accountId: target.accountId } : {}), + ...(target.threadId ? { threadId: target.threadId } : {}), + }; +} + +function buildCronDeliveryTrace(params: { + deliveryPlan: CronDeliveryPlan; + resolvedDelivery: ResolvedCronDeliveryTarget; + messagingToolSentTargets: MessagingToolSend[]; + fallbackUsed: boolean; + delivered: boolean; +}): CronDeliveryTrace { + const intended = normalizeCronTraceTarget({ + channel: params.deliveryPlan.channel ?? "last", + to: params.deliveryPlan.to ?? null, + accountId: params.deliveryPlan.accountId, + threadId: params.deliveryPlan.threadId, + source: + params.deliveryPlan.channel === "last" || !params.deliveryPlan.channel ? "last" : "explicit", + }); + const resolved = params.resolvedDelivery.ok + ? { + ok: true, + ...normalizeCronTraceTarget({ + channel: params.resolvedDelivery.channel, + to: params.resolvedDelivery.to, + accountId: params.resolvedDelivery.accountId, + threadId: params.resolvedDelivery.threadId, + source: params.resolvedDelivery.mode === "implicit" ? "last" : "explicit", + }), + } + : { + ok: false, + ...normalizeCronTraceTarget({ + channel: params.resolvedDelivery.channel, + to: params.resolvedDelivery.to ?? null, + accountId: params.resolvedDelivery.accountId, + threadId: params.resolvedDelivery.threadId, + source: params.resolvedDelivery.mode === "implicit" ? "last" : "explicit", + }), + error: params.resolvedDelivery.error.message, + }; + const messageToolSentTo = params.messagingToolSentTargets + .map((target) => normalizeMessagingToolTarget(target)) + .filter((target): target is CronDeliveryTraceMessageTarget => Boolean(target)); + return { + ...(intended ? { intended } : {}), + resolved, + ...(messageToolSentTo.length > 0 ? { messageToolSentTo } : {}), + fallbackUsed: params.fallbackUsed, + delivered: params.delivered, + }; +} + function resolveCronToolPolicy(params: { deliveryMode: "announce" | "webhook" | "none" }) { const enableMessageTool = params.deliveryMode !== "webhook"; return { @@ -233,6 +319,7 @@ type PreparedCronRunContext = { persistSessionEntry: PersistCronSessionEntry; withRunSession: WithRunSession; agentPayload: Extract | null; + deliveryPlan: CronDeliveryPlan; resolvedDelivery: ResolvedCronDeliveryTarget; deliveryRequested: boolean; toolPolicy: ReturnType; @@ -403,11 +490,12 @@ async function prepareCronRunContext(params: { input.job.payload.kind === "agentTurn" ? input.job.payload.timeoutSeconds : undefined, }); const agentPayload = input.job.payload.kind === "agentTurn" ? input.job.payload : null; - const { deliveryRequested, resolvedDelivery, toolPolicy } = await resolveCronDeliveryContext({ - cfg: cfgWithAgentDefaults, - job: input.job, - agentId, - }); + const { deliveryPlan, deliveryRequested, resolvedDelivery, toolPolicy } = + await resolveCronDeliveryContext({ + cfg: cfgWithAgentDefaults, + job: input.job, + agentId, + }); const { formattedTime, timeLine } = resolveCronStyleNow(input.cfg, now); const base = `[cron:${input.job.id} ${input.job.name}] ${input.message}`.trim(); @@ -518,6 +606,7 @@ async function prepareCronRunContext(params: { persistSessionEntry, withRunSession, agentPayload, + deliveryPlan, resolvedDelivery, deliveryRequested, toolPolicy, @@ -641,7 +730,11 @@ async function finalizeCronRun(params: { finalAssistantVisibleText: finalRunResult.meta?.finalAssistantVisibleText, preferFinalAssistantVisibleText: prepared.resolvedDelivery.channel === "telegram", }); - const resolveRunOutcome = (result?: { delivered?: boolean; deliveryAttempted?: boolean }) => + const resolveRunOutcome = (result?: { + delivered?: boolean; + deliveryAttempted?: boolean; + delivery?: CronDeliveryTrace; + }) => prepared.withRunSession({ status: hasFatalErrorPayload ? "error" : "ok", ...(hasFatalErrorPayload @@ -651,6 +744,7 @@ async function finalizeCronRun(params: { outputText, delivered: result?.delivered, deliveryAttempted: result?.deliveryAttempted, + delivery: result?.delivery, ...telemetry, }); @@ -702,11 +796,19 @@ async function finalizeCronRun(params: { abortReason: params.abortReason, withRunSession: prepared.withRunSession, }); + const deliveryTrace = buildCronDeliveryTrace({ + deliveryPlan: prepared.deliveryPlan, + resolvedDelivery: prepared.resolvedDelivery, + messagingToolSentTargets, + fallbackUsed: deliveryResult.deliveryAttempted && !skipMessagingToolDelivery, + delivered: deliveryResult.delivered, + }); if (deliveryResult.result) { const resultWithDeliveryMeta: RunCronAgentTurnResult = { ...deliveryResult.result, deliveryAttempted: deliveryResult.result.deliveryAttempted ?? deliveryResult.deliveryAttempted, + delivery: deliveryTrace, }; if (!hasFatalErrorPayload || deliveryResult.result.status !== "ok") { return resultWithDeliveryMeta; @@ -714,6 +816,7 @@ async function finalizeCronRun(params: { return resolveRunOutcome({ delivered: deliveryResult.result.delivered, deliveryAttempted: resultWithDeliveryMeta.deliveryAttempted, + delivery: deliveryTrace, }); } summary = deliveryResult.summary; @@ -721,6 +824,7 @@ async function finalizeCronRun(params: { return resolveRunOutcome({ delivered: deliveryResult.delivered, deliveryAttempted: deliveryResult.deliveryAttempted, + delivery: deliveryTrace, }); } diff --git a/src/cron/isolated-agent/run.types.ts b/src/cron/isolated-agent/run.types.ts index e38a1779e7c..5d22d2266d5 100644 --- a/src/cron/isolated-agent/run.types.ts +++ b/src/cron/isolated-agent/run.types.ts @@ -1,4 +1,4 @@ -import type { CronRunOutcome, CronRunTelemetry } from "../types.js"; +import type { CronDeliveryTrace, CronRunOutcome, CronRunTelemetry } from "../types.js"; export type RunCronAgentTurnResult = { /** Last non-empty agent text output (not truncated). */ @@ -16,5 +16,6 @@ export type RunCronAgentTurnResult = { * cannot guarantee a final delivery ack synchronously. */ deliveryAttempted?: boolean; + delivery?: CronDeliveryTrace; } & CronRunOutcome & CronRunTelemetry; diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index bea443af429..bf3626771a3 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -209,6 +209,13 @@ describe("cron run log", () => { delivered: true, deliveryStatus: "not-delivered", deliveryError: "announce failed", + delivery: { + intended: { channel: "last", to: null, source: "last" }, + resolved: { ok: true, channel: "telegram", to: "-100", source: "last" }, + messageToolSentTo: [{ channel: "telegram", to: "-100" }], + fallbackUsed: false, + delivered: true, + }, }), ].join("\n") + "\n", "utf-8", @@ -220,6 +227,13 @@ describe("cron run log", () => { expect(entries[0]?.delivered).toBe(true); expect(entries[0]?.deliveryStatus).toBe("not-delivered"); expect(entries[0]?.deliveryError).toBe("announce failed"); + expect(entries[0]?.delivery).toEqual({ + intended: { channel: "last", to: null, source: "last" }, + resolved: { ok: true, channel: "telegram", to: "-100", source: "last" }, + messageToolSentTo: [{ channel: "telegram", to: "-100" }], + fallbackUsed: false, + delivered: true, + }); }); }); diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 0a14a343238..98052846fc3 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -8,7 +8,12 @@ import { normalizeOptionalString, normalizeStringifiedOptionalString, } from "../shared/string-coerce.js"; -import type { CronDeliveryStatus, CronRunStatus, CronRunTelemetry } from "./types.js"; +import type { + CronDeliveryStatus, + CronDeliveryTrace, + CronRunStatus, + CronRunTelemetry, +} from "./types.js"; export type CronRunLogEntry = { ts: number; @@ -20,6 +25,7 @@ export type CronRunLogEntry = { delivered?: boolean; deliveryStatus?: CronDeliveryStatus; deliveryError?: string; + delivery?: CronDeliveryTrace; sessionId?: string; sessionKey?: string; runAtMs?: number; @@ -319,6 +325,9 @@ function parseAllRunLogEntries(raw: string, opts?: { jobId?: string }): CronRunL if (typeof obj.deliveryError === "string") { entry.deliveryError = obj.deliveryError; } + if (obj.delivery && typeof obj.delivery === "object") { + entry.delivery = obj.delivery; + } if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) { entry.sessionId = obj.sessionId; } @@ -375,7 +384,20 @@ export async function readCronRunLogEntriesPage( statuses, deliveryStatuses, query, - queryTextForEntry: (entry) => [entry.summary ?? "", entry.error ?? "", entry.jobId].join(" "), + queryTextForEntry: (entry) => + [ + entry.summary ?? "", + entry.error ?? "", + entry.jobId, + entry.delivery?.intended?.channel ?? "", + entry.delivery?.intended?.to ?? "", + entry.delivery?.resolved?.channel ?? "", + entry.delivery?.resolved?.to ?? "", + ...(entry.delivery?.messageToolSentTo ?? []).flatMap((target) => [ + target.channel, + target.to ?? "", + ]), + ].join(" "), }); const sorted = sortDir === "asc" @@ -432,7 +454,20 @@ export async function readCronRunLogEntriesPageAll( query, queryTextForEntry: (entry) => { const jobName = opts.jobNameById?.[entry.jobId] ?? ""; - return [entry.summary ?? "", entry.error ?? "", entry.jobId, jobName].join(" "); + return [ + entry.summary ?? "", + entry.error ?? "", + entry.jobId, + jobName, + entry.delivery?.intended?.channel ?? "", + entry.delivery?.intended?.to ?? "", + entry.delivery?.resolved?.channel ?? "", + entry.delivery?.resolved?.to ?? "", + ...(entry.delivery?.messageToolSentTo ?? []).flatMap((target) => [ + target.channel, + target.to ?? "", + ]), + ].join(" "); }, }); const sorted = diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index bd7f38be019..3c8ec5e04fd 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -643,6 +643,7 @@ async function finishPreparedManualRun( delivered: coreResult.delivered, deliveryStatus: job.state.lastDeliveryStatus, deliveryError: job.state.lastDeliveryError, + delivery: coreResult.delivery, sessionId: coreResult.sessionId, sessionKey: coreResult.sessionKey, runAtMs: startedAt, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index a9adc290ec1..6cd7f6e1612 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -2,6 +2,7 @@ import type { CronConfig } from "../../config/types.cron.js"; import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js"; import type { CronDeliveryStatus, + CronDeliveryTrace, CronJob, CronJobCreate, CronJobPatch, @@ -23,6 +24,7 @@ export type CronEvent = { delivered?: boolean; deliveryStatus?: CronDeliveryStatus; deliveryError?: string; + delivery?: CronDeliveryTrace; sessionId?: string; sessionKey?: string; nextRunAtMs?: number; @@ -100,6 +102,7 @@ export type CronServiceDeps = { * if the final per-message ack status is uncertain. */ deliveryAttempted?: boolean; + delivery?: CronDeliveryTrace; } & CronRunOutcome & CronRunTelemetry >; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 890b7574370..f8d8e93c4d6 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -14,6 +14,7 @@ import { createCronExecutionId } from "../run-id.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus, + CronDeliveryTrace, CronJob, CronMessageChannel, CronRunOutcome, @@ -1134,7 +1135,12 @@ export async function executeJobCore( job: CronJob, abortSignal?: AbortSignal, ): Promise< - CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } + CronRunOutcome & + CronRunTelemetry & { + delivered?: boolean; + deliveryAttempted?: boolean; + delivery?: CronDeliveryTrace; + } > { const resolveAbortError = () => ({ status: "error" as const, @@ -1178,7 +1184,12 @@ async function executeMainSessionCronJob( abortSignal: AbortSignal | undefined, waitWithAbort: (ms: number) => Promise, ): Promise< - CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } + CronRunOutcome & + CronRunTelemetry & { + delivered?: boolean; + deliveryAttempted?: boolean; + delivery?: CronDeliveryTrace; + } > { const text = resolveJobPayloadTextForMain(job); if (!text) { @@ -1275,7 +1286,12 @@ async function executeDetachedCronJob( abortSignal: AbortSignal | undefined, resolveAbortError: () => { status: "error"; error: string }, ): Promise< - CronRunOutcome & CronRunTelemetry & { delivered?: boolean; deliveryAttempted?: boolean } + CronRunOutcome & + CronRunTelemetry & { + delivered?: boolean; + deliveryAttempted?: boolean; + delivery?: CronDeliveryTrace; + } > { if (job.payload.kind !== "agentTurn") { return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; @@ -1300,6 +1316,7 @@ async function executeDetachedCronJob( summary: res.summary, delivered: res.delivered, deliveryAttempted: res.deliveryAttempted, + delivery: res.delivery, sessionId: res.sessionId, sessionKey: res.sessionKey, model: res.model, @@ -1330,6 +1347,7 @@ export async function executeJob( let coreResult: { status: CronRunStatus; delivered?: boolean; + delivery?: CronDeliveryTrace; } & CronRunOutcome & CronRunTelemetry; try { @@ -1362,6 +1380,7 @@ function emitJobFinished( result: { status: CronRunStatus; delivered?: boolean; + delivery?: CronDeliveryTrace; } & CronRunOutcome & CronRunTelemetry, runAtMs: number, @@ -1375,6 +1394,7 @@ function emitJobFinished( delivered: result.delivered, deliveryStatus: job.state.lastDeliveryStatus, deliveryError: job.state.lastDeliveryError, + delivery: result.delivery, sessionId: result.sessionId, sessionKey: result.sessionKey, runAtMs, diff --git a/src/cron/types.ts b/src/cron/types.ts index 047086a533a..8f85f55f53e 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -46,6 +46,29 @@ export type CronDeliveryPatch = Partial; export type CronRunStatus = "ok" | "error" | "skipped"; export type CronDeliveryStatus = "delivered" | "not-delivered" | "unknown" | "not-requested"; +export type CronDeliveryTraceTarget = { + channel?: string; + to?: string | null; + accountId?: string; + threadId?: string | number; + source?: "explicit" | "last"; +}; + +export type CronDeliveryTraceMessageTarget = { + channel: string; + to?: string; + accountId?: string; + threadId?: string; +}; + +export type CronDeliveryTrace = { + intended?: CronDeliveryTraceTarget; + resolved?: CronDeliveryTraceTarget & { ok: boolean; error?: string }; + messageToolSentTo?: CronDeliveryTraceMessageTarget[]; + fallbackUsed?: boolean; + delivered?: boolean; +}; + export type CronUsageSummary = { input_tokens?: number; output_tokens?: number; diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 301cea1894a..98e0334956b 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -562,6 +562,7 @@ export function buildGatewayCronService(params: { delivered: evt.delivered, deliveryStatus: evt.deliveryStatus, deliveryError: evt.deliveryError, + delivery: evt.delivery, sessionId: evt.sessionId, sessionKey: evt.sessionKey, runAtMs: evt.runAtMs,