mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-04 15:10:22 +00:00
Cron: enforce cron-owned delivery
This commit is contained in:
@@ -105,7 +105,6 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
|
||||
resolvedDelivery,
|
||||
deliveryRequested: overrides.deliveryRequested ?? true,
|
||||
skipHeartbeatDelivery: false,
|
||||
skipMessagingToolDelivery: false,
|
||||
deliveryBestEffort: false,
|
||||
deliveryPayloadHasStructuredContent: false,
|
||||
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
|
||||
|
||||
@@ -83,7 +83,6 @@ type DispatchCronDeliveryParams = {
|
||||
resolvedDelivery: DeliveryTargetResolution;
|
||||
deliveryRequested: boolean;
|
||||
skipHeartbeatDelivery: boolean;
|
||||
skipMessagingToolDelivery: boolean;
|
||||
deliveryBestEffort: boolean;
|
||||
deliveryPayloadHasStructuredContent: boolean;
|
||||
deliveryPayloads: ReplyPayload[];
|
||||
@@ -197,10 +196,10 @@ export async function dispatchCronDelivery(
|
||||
let synthesizedText = params.synthesizedText;
|
||||
let deliveryPayloads = params.deliveryPayloads;
|
||||
|
||||
// `true` means we confirmed at least one outbound send reached the target.
|
||||
// Keep this strict so timer fallback can safely decide whether to wake main.
|
||||
let delivered = params.skipMessagingToolDelivery;
|
||||
let deliveryAttempted = params.skipMessagingToolDelivery;
|
||||
// `true` means we confirmed cron handled this run as delivered/suppressed.
|
||||
// Keep actual sends strict so cron status is still meaningful.
|
||||
let delivered = false;
|
||||
let deliveryAttempted = false;
|
||||
const failDeliveryTarget = (error: string) =>
|
||||
params.withRunSession({
|
||||
status: "error",
|
||||
@@ -404,11 +403,7 @@ export async function dispatchCronDelivery(
|
||||
}
|
||||
};
|
||||
|
||||
if (
|
||||
params.deliveryRequested &&
|
||||
!params.skipHeartbeatDelivery &&
|
||||
!params.skipMessagingToolDelivery
|
||||
) {
|
||||
if (params.deliveryRequested && !params.skipHeartbeatDelivery) {
|
||||
if (!params.resolvedDelivery.ok) {
|
||||
if (!params.deliveryBestEffort) {
|
||||
return {
|
||||
|
||||
@@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
restoreFastTestEnv(previousFastTestEnv);
|
||||
});
|
||||
|
||||
it('keeps the message tool enabled when delivery.mode is "none"', async () => {
|
||||
it('disables the message tool when delivery.mode is "none"', async () => {
|
||||
mockFallbackPassthrough();
|
||||
resolveCronDeliveryPlanMock.mockReturnValue({
|
||||
requested: false,
|
||||
@@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
await runCronIsolatedAgentTurn(makeParams());
|
||||
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
|
||||
});
|
||||
|
||||
it("disables the message tool when cron delivery is active", async () => {
|
||||
|
||||
@@ -55,11 +55,7 @@ import {
|
||||
} from "../../security/external-content.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
|
||||
import {
|
||||
dispatchCronDelivery,
|
||||
matchesMessagingToolDeliveryTarget,
|
||||
resolveCronDeliveryBestEffort,
|
||||
} from "./delivery-dispatch.js";
|
||||
import { dispatchCronDelivery, resolveCronDeliveryBestEffort } from "./delivery-dispatch.js";
|
||||
import { resolveDeliveryTarget } from "./delivery-target.js";
|
||||
import {
|
||||
isHeartbeatOnlyResponse,
|
||||
@@ -78,11 +74,9 @@ export type RunCronAgentTurnResult = {
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
outputText?: string;
|
||||
/**
|
||||
* `true` when the isolated run already delivered its output to the target
|
||||
* channel (via outbound payloads, the subagent announce flow, or a matching
|
||||
* messaging-tool send). Callers should skip posting a summary to the main
|
||||
* session to avoid duplicate
|
||||
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
||||
* `true` when the cron runner already handled the run's delivery outcome.
|
||||
* This now refers only to cron-owned delivery behavior, not ad hoc agent
|
||||
* messaging. Callers can use it to distinguish delivered vs suppressed runs.
|
||||
*/
|
||||
delivered?: boolean;
|
||||
/**
|
||||
@@ -153,7 +147,9 @@ function resolveCronToolPolicy(params: {
|
||||
// was successfully resolved. When resolution fails the agent should not
|
||||
// be blocked by a target it cannot satisfy (#27898).
|
||||
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
|
||||
disableMessageTool: params.deliveryRequested,
|
||||
// Cron runs now always route user-facing delivery through the cron runner
|
||||
// itself so jobs cannot silently bypass delivery policy with ad hoc sends.
|
||||
disableMessageTool: true,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -806,17 +802,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
// Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content).
|
||||
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
|
||||
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
|
||||
const skipMessagingToolDelivery =
|
||||
deliveryRequested &&
|
||||
finalRunResult.didSendViaMessagingTool === true &&
|
||||
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
|
||||
matchesMessagingToolDeliveryTarget(target, {
|
||||
channel: resolvedDelivery.channel,
|
||||
to: resolvedDelivery.to,
|
||||
accountId: resolvedDelivery.accountId,
|
||||
}),
|
||||
);
|
||||
|
||||
const deliveryResult = await dispatchCronDelivery({
|
||||
cfg: params.cfg,
|
||||
cfgWithAgentDefaults,
|
||||
@@ -831,7 +816,6 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
resolvedDelivery,
|
||||
deliveryRequested,
|
||||
skipHeartbeatDelivery,
|
||||
skipMessagingToolDelivery,
|
||||
deliveryBestEffort,
|
||||
deliveryPayloadHasStructuredContent,
|
||||
deliveryPayloads,
|
||||
|
||||
@@ -620,14 +620,14 @@ describe("CronService", () => {
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("runs an isolated job and posts summary to main", async () => {
|
||||
it("runs an isolated job without posting a fallback summary to main", async () => {
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
|
||||
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
|
||||
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
|
||||
await runIsolatedAnnounceScenario({ cron, events, name: "weekly" });
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done");
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
@@ -685,7 +685,7 @@ describe("CronService", () => {
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
it("posts last output to main even when isolated job errors", async () => {
|
||||
it("does not post a fallback main summary when an isolated job errors", async () => {
|
||||
const runIsolatedAgentJob = vi.fn(async () => ({
|
||||
status: "error" as const,
|
||||
summary: "last output",
|
||||
@@ -700,8 +700,8 @@ describe("CronService", () => {
|
||||
status: "error",
|
||||
});
|
||||
|
||||
expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output");
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||
await stopCronAndCleanup(cron, store);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
|
||||
import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
|
||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||
import type {
|
||||
CronDeliveryStatus,
|
||||
@@ -1138,46 +1136,6 @@ export async function executeJobCore(
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
|
||||
// Post a short summary back to the main session only when announce
|
||||
// delivery was requested and we are confident no outbound delivery path
|
||||
// ran. If delivery was attempted but final ack is uncertain, suppress the
|
||||
// main summary to avoid duplicate user-facing sends.
|
||||
// See: https://github.com/openclaw/openclaw/issues/15692
|
||||
//
|
||||
// Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these
|
||||
// are internal ack tokens that should never leak into user conversations.
|
||||
// See: https://github.com/openclaw/openclaw/issues/32013
|
||||
const summaryText = res.summary?.trim();
|
||||
const deliveryPlan = resolveCronDeliveryPlan(job);
|
||||
const suppressMainSummary =
|
||||
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
|
||||
if (
|
||||
shouldEnqueueCronMainSummary({
|
||||
summaryText,
|
||||
deliveryRequested: deliveryPlan.requested,
|
||||
delivered: res.delivered,
|
||||
deliveryAttempted: res.deliveryAttempted,
|
||||
suppressMainSummary,
|
||||
isCronSystemEvent,
|
||||
})
|
||||
) {
|
||||
const prefix = "Cron";
|
||||
const label =
|
||||
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
|
||||
state.deps.enqueueSystemEvent(label, {
|
||||
agentId: job.agentId,
|
||||
sessionKey: job.sessionKey,
|
||||
contextKey: `cron:${job.id}`,
|
||||
});
|
||||
if (job.wakeMode === "now") {
|
||||
state.deps.requestHeartbeatNow({
|
||||
reason: `cron:${job.id}`,
|
||||
agentId: job.agentId,
|
||||
sessionKey: job.sessionKey,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
status: res.status,
|
||||
error: res.error,
|
||||
|
||||
Reference in New Issue
Block a user