From d4e59a3666d810f9574392c70abb942e0c3b0dd8 Mon Sep 17 00:00:00 2001 From: Mariano Date: Mon, 9 Mar 2026 20:12:37 +0100 Subject: [PATCH] Cron: enforce cron-owned delivery contract (#40998) Merged via squash. Prepared head SHA: 5877389e33d5b3a518925b5793a6f6294cb3fb3d Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky --- CHANGELOG.md | 2 + CONTRIBUTING.md | 3 +- docs/automation/cron-jobs.md | 1 + docs/cli/cron.md | 6 + docs/cli/doctor.md | 1 + docs/gateway/doctor.md | 20 + src/cli/cron-cli/register.ts | 2 +- src/commands/doctor-cron.test.ts | 158 ++++++ src/commands/doctor-cron.ts | 186 +++++++ src/commands/doctor.ts | 6 + src/cron/delivery.failure-notify.test.ts | 143 +++++ src/cron/delivery.test.ts | 40 ++ .../isolated-agent.delivery.test-helpers.ts | 2 + ...p-recipient-besteffortdeliver-true.test.ts | 2 + .../delivery-dispatch.double-announce.test.ts | 43 +- .../delivery-dispatch.named-agent.test.ts | 9 + src/cron/isolated-agent/delivery-dispatch.ts | 18 +- .../run.message-tool-policy.test.ts | 20 +- src/cron/isolated-agent/run.ts | 24 +- src/cron/service.delivery-plan.test.ts | 8 +- ...ce.heartbeat-ok-summary-suppressed.test.ts | 9 +- ...runs-one-shot-main-job-disables-it.test.ts | 12 +- src/cron/service/store.ts | 435 +--------------- src/cron/service/timer.ts | 42 -- src/cron/store-migration.test.ts | 78 +++ src/cron/store-migration.ts | 491 ++++++++++++++++++ src/gateway/server.cron.test.ts | 28 +- src/gateway/server.hooks.test.ts | 4 + src/gateway/server/hooks.ts | 1 + 29 files changed, 1277 insertions(+), 517 deletions(-) create mode 100644 src/commands/doctor-cron.test.ts create mode 100644 src/commands/doctor-cron.ts create mode 100644 src/cron/delivery.failure-notify.test.ts create mode 100644 src/cron/store-migration.test.ts create mode 100644 src/cron/store-migration.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e51ea3a0a1..4be8bad0eaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ Docs: https://docs.openclaw.ai ### Breaking +- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky. + ### Fixes - macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3d02d1f2059..1127d7dc791 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -58,7 +58,6 @@ Welcome to the lobster tank! 🦞 - **Jonathan Taylor** - ACP subsystem, Gateway features/bugs, Gog/Mog/Sog CLI's, SEDMAT - GitHub [@visionik](https://github.com/visionik) · X: [@visionik](https://x.com/visionik) - - **Josh Lehman** - Compaction, Tlon/Urbit subsystem - GitHub [@jalehman](https://github.com/jalehman) · X: [@jlehman\_](https://x.com/jlehman_) @@ -73,7 +72,7 @@ Welcome to the lobster tank! 🦞 - **Robin Waslander** - Security, PR triage, bug fixes - GitHub: [@hydro13](https://github.com/hydro13) · X: [@Robin_waslander](https://x.com/Robin_waslander) - + ## How to Contribute 1. **Bugs & small fixes** → Open a PR! diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 47bae78b86f..a0b5e505476 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -29,6 +29,7 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting) - Wakeups are first-class: a job can request “wake now” vs “next heartbeat”. - Webhook posting is per job via `delivery.mode = "webhook"` + `delivery.to = ""`. - Legacy fallback remains for stored jobs with `notify: true` when `cron.webhook` is set, migrate those jobs to webhook delivery mode. +- For upgrades, `openclaw doctor --fix` can normalize legacy cron store fields before the scheduler touches them. ## Quick start (actionable) diff --git a/docs/cli/cron.md b/docs/cli/cron.md index 28e61e20c99..6ee25859749 100644 --- a/docs/cli/cron.md +++ b/docs/cli/cron.md @@ -30,6 +30,12 @@ Note: retention/pruning is controlled in config: - `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions. - `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/.jsonl`. +Upgrade note: if you have older cron jobs from before the current delivery/store format, run +`openclaw doctor --fix`. Doctor now normalizes legacy cron fields (`jobId`, `schedule.cron`, +top-level delivery fields, payload `provider` delivery aliases) and migrates simple +`notify: true` webhook fallback jobs to explicit webhook delivery when `cron.webhook` is +configured. + ## Common edits Update delivery settings without changing the message: diff --git a/docs/cli/doctor.md b/docs/cli/doctor.md index d53d86452f3..90e5fa7d7a2 100644 --- a/docs/cli/doctor.md +++ b/docs/cli/doctor.md @@ -28,6 +28,7 @@ Notes: - Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts. - `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal. - State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.` to reclaim space safely. +- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime. - Doctor includes a memory-search readiness check and can recommend `openclaw configure --section model` when embedding credentials are missing. - If sandbox mode is enabled but Docker is unavailable, doctor reports a high-signal warning with remediation (`install Docker` or `openclaw config set agents.defaults.sandbox.mode off`). diff --git a/docs/gateway/doctor.md b/docs/gateway/doctor.md index 2550406f4ff..b46b90520d1 100644 --- a/docs/gateway/doctor.md +++ b/docs/gateway/doctor.md @@ -65,6 +65,7 @@ cat ~/.openclaw/openclaw.json - Config normalization for legacy values. - OpenCode Zen provider override warnings (`models.providers.opencode`). - Legacy on-disk state migration (sessions/agent dir/WhatsApp auth). +- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs). - State integrity and permissions checks (sessions, transcripts, state dir). - Config file permission checks (chmod 600) when running locally. - Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states. @@ -158,6 +159,25 @@ the legacy sessions + agent dir on startup so history/auth/models land in the per-agent path without a manual doctor run. WhatsApp auth is intentionally only migrated via `openclaw doctor`. +### 3b) Legacy cron store migrations + +Doctor also checks the cron job store (`~/.openclaw/cron/jobs.json` by default, +or `cron.store` when overridden) for old job shapes that the scheduler still +accepts for compatibility. + +Current cron cleanups include: + +- `jobId` → `id` +- `schedule.cron` → `schedule.expr` +- top-level payload fields (`message`, `model`, `thinking`, ...) → `payload` +- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery` +- payload `provider` delivery aliases → explicit `delivery.channel` +- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook` + +Doctor only auto-migrates `notify: true` jobs when it can do so without +changing behavior. If a job combines legacy notify fallback with an existing +non-webhook delivery mode, doctor warns and leaves that job for manual review. + ### 4) State integrity checks (session persistence, routing, and safety) The state directory is the operational brainstem. If it vanishes, you lose diff --git a/src/cli/cron-cli/register.ts b/src/cli/cron-cli/register.ts index a796583fa21..35f80dbda06 100644 --- a/src/cli/cron-cli/register.ts +++ b/src/cli/cron-cli/register.ts @@ -16,7 +16,7 @@ export function registerCronCli(program: Command) { .addHelpText( "after", () => - `\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n`, + `\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n${theme.muted("Upgrade tip:")} run \`openclaw doctor --fix\` to normalize legacy cron job storage.\n`, ); registerCronStatusCommand(cron); diff --git a/src/commands/doctor-cron.test.ts b/src/commands/doctor-cron.test.ts new file mode 100644 index 00000000000..8c9faf0e24d --- /dev/null +++ b/src/commands/doctor-cron.test.ts @@ -0,0 +1,158 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import * as noteModule from "../terminal/note.js"; +import { maybeRepairLegacyCronStore } from "./doctor-cron.js"; + +let tempRoot: string | null = null; + +async function makeTempStorePath() { + tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-")); + return path.join(tempRoot, "cron", "jobs.json"); +} + +afterEach(async () => { + vi.restoreAllMocks(); + if (tempRoot) { + await fs.rm(tempRoot, { recursive: true, force: true }); + tempRoot = null; + } +}); + +function makePrompter(confirmResult = true) { + return { + confirm: vi.fn().mockResolvedValue(confirmResult), + }; +} + +describe("maybeRepairLegacyCronStore", () => { + it("repairs legacy cron store fields and migrates notify fallback to webhook delivery", async () => { + const storePath = await makeTempStorePath(); + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile( + storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + jobId: "legacy-job", + name: "Legacy job", + notify: true, + createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"), + schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" }, + payload: { + kind: "systemEvent", + text: "Morning brief", + }, + state: {}, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); + const cfg: OpenClawConfig = { + cron: { + store: storePath, + webhook: "https://example.invalid/cron-finished", + }, + }; + + await maybeRepairLegacyCronStore({ + cfg, + options: {}, + prompter: makePrompter(true), + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as { + jobs: Array>; + }; + const [job] = persisted.jobs; + expect(job?.jobId).toBeUndefined(); + expect(job?.id).toBe("legacy-job"); + expect(job?.notify).toBeUndefined(); + expect(job?.schedule).toMatchObject({ + kind: "cron", + expr: "0 7 * * *", + tz: "UTC", + }); + expect(job?.delivery).toMatchObject({ + mode: "webhook", + to: "https://example.invalid/cron-finished", + }); + expect(job?.payload).toMatchObject({ + kind: "systemEvent", + text: "Morning brief", + }); + + expect(noteSpy).toHaveBeenCalledWith( + expect.stringContaining("Legacy cron job storage detected"), + "Cron", + ); + expect(noteSpy).toHaveBeenCalledWith( + expect.stringContaining("Cron store normalized"), + "Doctor changes", + ); + }); + + it("warns instead of replacing announce delivery for notify fallback jobs", async () => { + const storePath = await makeTempStorePath(); + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile( + storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "notify-and-announce", + name: "Notify and announce", + notify: true, + createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"), + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "Status" }, + delivery: { mode: "announce", channel: "telegram", to: "123" }, + state: {}, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {}); + + await maybeRepairLegacyCronStore({ + cfg: { + cron: { + store: storePath, + webhook: "https://example.invalid/cron-finished", + }, + }, + options: { nonInteractive: true }, + prompter: makePrompter(true), + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as { + jobs: Array>; + }; + expect(persisted.jobs[0]?.notify).toBe(true); + expect(noteSpy).toHaveBeenCalledWith( + expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'), + "Doctor warnings", + ); + }); +}); diff --git a/src/commands/doctor-cron.ts b/src/commands/doctor-cron.ts new file mode 100644 index 00000000000..3dc6275e800 --- /dev/null +++ b/src/commands/doctor-cron.ts @@ -0,0 +1,186 @@ +import { formatCliCommand } from "../cli/command-format.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { normalizeStoredCronJobs } from "../cron/store-migration.js"; +import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js"; +import type { CronJob } from "../cron/types.js"; +import { note } from "../terminal/note.js"; +import { shortenHomePath } from "../utils.js"; +import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js"; + +type CronDoctorOutcome = { + changed: boolean; + warnings: string[]; +}; + +function pluralize(count: number, noun: string) { + return `${count} ${noun}${count === 1 ? "" : "s"}`; +} + +function formatLegacyIssuePreview(issues: Partial>): string[] { + const lines: string[] = []; + if (issues.jobId) { + lines.push(`- ${pluralize(issues.jobId, "job")} still uses legacy \`jobId\``); + } + if (issues.legacyScheduleString) { + lines.push( + `- ${pluralize(issues.legacyScheduleString, "job")} stores schedule as a bare string`, + ); + } + if (issues.legacyScheduleCron) { + lines.push(`- ${pluralize(issues.legacyScheduleCron, "job")} still uses \`schedule.cron\``); + } + if (issues.legacyPayloadKind) { + lines.push(`- ${pluralize(issues.legacyPayloadKind, "job")} needs payload kind normalization`); + } + if (issues.legacyPayloadProvider) { + lines.push( + `- ${pluralize(issues.legacyPayloadProvider, "job")} still uses payload \`provider\` as a delivery alias`, + ); + } + if (issues.legacyTopLevelPayloadFields) { + lines.push( + `- ${pluralize(issues.legacyTopLevelPayloadFields, "job")} still uses top-level payload fields`, + ); + } + if (issues.legacyTopLevelDeliveryFields) { + lines.push( + `- ${pluralize(issues.legacyTopLevelDeliveryFields, "job")} still uses top-level delivery fields`, + ); + } + if (issues.legacyDeliveryMode) { + lines.push( + `- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``, + ); + } + return lines; +} + +function trimString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +function migrateLegacyNotifyFallback(params: { + jobs: Array>; + legacyWebhook?: string; +}): CronDoctorOutcome { + let changed = false; + const warnings: string[] = []; + + for (const raw of params.jobs) { + if (!("notify" in raw)) { + continue; + } + + const jobName = trimString(raw.name) ?? trimString(raw.id) ?? ""; + const notify = raw.notify === true; + if (!notify) { + delete raw.notify; + changed = true; + continue; + } + + const delivery = + raw.delivery && typeof raw.delivery === "object" && !Array.isArray(raw.delivery) + ? (raw.delivery as Record) + : null; + const mode = trimString(delivery?.mode)?.toLowerCase(); + const to = trimString(delivery?.to); + + if (mode === "webhook" && to) { + delete raw.notify; + changed = true; + continue; + } + + if ((mode === undefined || mode === "none" || mode === "webhook") && params.legacyWebhook) { + raw.delivery = { + ...delivery, + mode: "webhook", + to: to ?? params.legacyWebhook, + }; + delete raw.notify; + changed = true; + continue; + } + + if (!params.legacyWebhook) { + warnings.push( + `Cron job "${jobName}" still uses legacy notify fallback, but cron.webhook is unset so doctor cannot migrate it automatically.`, + ); + continue; + } + + warnings.push( + `Cron job "${jobName}" uses legacy notify fallback alongside delivery mode "${mode}". Migrate it manually so webhook delivery does not replace existing announce behavior.`, + ); + } + + return { changed, warnings }; +} + +export async function maybeRepairLegacyCronStore(params: { + cfg: OpenClawConfig; + options: DoctorOptions; + prompter: Pick; +}) { + const storePath = resolveCronStorePath(params.cfg.cron?.store); + const store = await loadCronStore(storePath); + const rawJobs = (store.jobs ?? []) as unknown as Array>; + if (rawJobs.length === 0) { + return; + } + + const normalized = normalizeStoredCronJobs(rawJobs); + const legacyWebhook = trimString(params.cfg.cron?.webhook); + const notifyCount = rawJobs.filter((job) => job.notify === true).length; + const previewLines = formatLegacyIssuePreview(normalized.issues); + if (notifyCount > 0) { + previewLines.push( + `- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`, + ); + } + if (previewLines.length === 0) { + return; + } + + note( + [ + `Legacy cron job storage detected at ${shortenHomePath(storePath)}.`, + ...previewLines, + `Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`, + ].join("\n"), + "Cron", + ); + + const shouldRepair = + params.options.nonInteractive === true + ? true + : await params.prompter.confirm({ + message: "Repair legacy cron jobs now?", + initialValue: true, + }); + if (!shouldRepair) { + return; + } + + const notifyMigration = migrateLegacyNotifyFallback({ + jobs: rawJobs, + legacyWebhook, + }); + const changed = normalized.mutated || notifyMigration.changed; + if (!changed && notifyMigration.warnings.length === 0) { + return; + } + + if (changed) { + await saveCronStore(storePath, { + version: 1, + jobs: rawJobs as unknown as CronJob[], + }); + note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes"); + } + + if (notifyMigration.warnings.length > 0) { + note(notifyMigration.warnings.join("\n"), "Doctor warnings"); + } +} diff --git a/src/commands/doctor.ts b/src/commands/doctor.ts index 2688774b8bb..bdde2781ff9 100644 --- a/src/commands/doctor.ts +++ b/src/commands/doctor.ts @@ -31,6 +31,7 @@ import { import { noteBootstrapFileSize } from "./doctor-bootstrap-size.js"; import { doctorShellCompletion } from "./doctor-completion.js"; import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js"; +import { maybeRepairLegacyCronStore } from "./doctor-cron.js"; import { maybeRepairGatewayDaemon } from "./doctor-gateway-daemon-flow.js"; import { checkGatewayHealth, probeGatewayMemoryStatus } from "./doctor-gateway-health.js"; import { @@ -220,6 +221,11 @@ export async function doctorCommand( await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH); await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair }); + await maybeRepairLegacyCronStore({ + cfg, + options, + prompter, + }); cfg = await maybeRepairSandboxImages(cfg, runtime, prompter); noteSandboxScopeWarnings(cfg); diff --git a/src/cron/delivery.failure-notify.test.ts b/src/cron/delivery.failure-notify.test.ts new file mode 100644 index 00000000000..98cb437c961 --- /dev/null +++ b/src/cron/delivery.failure-notify.test.ts @@ -0,0 +1,143 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const mocks = vi.hoisted(() => ({ + resolveDeliveryTarget: vi.fn(), + deliverOutboundPayloads: vi.fn(), + resolveAgentOutboundIdentity: vi.fn().mockReturnValue({ kind: "identity" }), + buildOutboundSessionContext: vi.fn().mockReturnValue({ kind: "session" }), + createOutboundSendDeps: vi.fn().mockReturnValue({ kind: "deps" }), + warn: vi.fn(), +})); + +vi.mock("./isolated-agent/delivery-target.js", () => ({ + resolveDeliveryTarget: mocks.resolveDeliveryTarget, +})); + +vi.mock("../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, +})); + +vi.mock("../infra/outbound/identity.js", () => ({ + resolveAgentOutboundIdentity: mocks.resolveAgentOutboundIdentity, +})); + +vi.mock("../infra/outbound/session-context.js", () => ({ + buildOutboundSessionContext: mocks.buildOutboundSessionContext, +})); + +vi.mock("../cli/outbound-send-deps.js", () => ({ + createOutboundSendDeps: mocks.createOutboundSendDeps, +})); + +vi.mock("../logging.js", () => ({ + getChildLogger: vi.fn(() => ({ + warn: mocks.warn, + })), +})); + +const { sendFailureNotificationAnnounce } = await import("./delivery.js"); + +describe("sendFailureNotificationAnnounce", () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.resolveDeliveryTarget.mockResolvedValue({ + ok: true, + channel: "telegram", + to: "123", + accountId: "bot-a", + threadId: 42, + mode: "explicit", + }); + mocks.deliverOutboundPayloads.mockResolvedValue([{ ok: true }]); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("delivers failure alerts to the resolved explicit target with strict send settings", async () => { + const deps = {} as never; + const cfg = {} as never; + + await sendFailureNotificationAnnounce( + deps, + cfg, + "main", + "job-1", + { channel: "telegram", to: "123", accountId: "bot-a" }, + "Cron failed", + ); + + expect(mocks.resolveDeliveryTarget).toHaveBeenCalledWith(cfg, "main", { + channel: "telegram", + to: "123", + accountId: "bot-a", + }); + expect(mocks.buildOutboundSessionContext).toHaveBeenCalledWith({ + cfg, + agentId: "main", + sessionKey: "cron:job-1:failure", + }); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + cfg, + channel: "telegram", + to: "123", + accountId: "bot-a", + threadId: 42, + payloads: [{ text: "Cron failed" }], + session: { kind: "session" }, + identity: { kind: "identity" }, + bestEffort: false, + deps: { kind: "deps" }, + abortSignal: expect.any(AbortSignal), + }), + ); + }); + + it("does not send when target resolution fails", async () => { + mocks.resolveDeliveryTarget.mockResolvedValue({ + ok: false, + error: new Error("target missing"), + }); + + await sendFailureNotificationAnnounce( + {} as never, + {} as never, + "main", + "job-1", + { channel: "telegram", to: "123" }, + "Cron failed", + ); + + expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled(); + expect(mocks.warn).toHaveBeenCalledWith( + { error: "target missing" }, + "cron: failed to resolve failure destination target", + ); + }); + + it("swallows outbound delivery errors after logging", async () => { + mocks.deliverOutboundPayloads.mockRejectedValue(new Error("send failed")); + + await expect( + sendFailureNotificationAnnounce( + {} as never, + {} as never, + "main", + "job-1", + { channel: "telegram", to: "123" }, + "Cron failed", + ), + ).resolves.toBeUndefined(); + + expect(mocks.warn).toHaveBeenCalledWith( + expect.objectContaining({ + err: "send failed", + channel: "telegram", + to: "123", + }), + "cron: failure destination announce failed", + ); + }); +}); diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 81ab672af57..43eaa215114 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -148,6 +148,46 @@ describe("resolveFailureDestination", () => { expect(plan).toBeNull(); }); + it("returns null when webhook failure destination matches the primary webhook target", () => { + const plan = resolveFailureDestination( + makeJob({ + sessionTarget: "main", + payload: { kind: "systemEvent", text: "tick" }, + delivery: { + mode: "webhook", + to: "https://example.invalid/cron", + failureDestination: { + mode: "webhook", + to: "https://example.invalid/cron", + }, + }, + }), + undefined, + ); + expect(plan).toBeNull(); + }); + + it("does not reuse inherited announce recipient when switching failure destination to webhook", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "111", + failureDestination: { + mode: "webhook", + }, + }, + }), + { + channel: "signal", + to: "group-abc", + mode: "announce", + }, + ); + expect(plan).toBeNull(); + }); + it("allows job-level failure destination fields to clear inherited global values", () => { const plan = resolveFailureDestination( makeJob({ diff --git a/src/cron/isolated-agent.delivery.test-helpers.ts b/src/cron/isolated-agent.delivery.test-helpers.ts index fe6dad727f4..de4caee3a3c 100644 --- a/src/cron/isolated-agent.delivery.test-helpers.ts +++ b/src/cron/isolated-agent.delivery.test-helpers.ts @@ -54,6 +54,7 @@ export async function runTelegramAnnounceTurn(params: { to?: string; bestEffort?: boolean; }; + deliveryContract?: "cron-owned" | "shared"; }): Promise>> { return runCronIsolatedAgentTurn({ cfg: makeCfg(params.home, params.storePath, { @@ -67,5 +68,6 @@ export async function runTelegramAnnounceTurn(params: { message: "do it", sessionKey: "cron:job-1", lane: "cron", + deliveryContract: params.deliveryContract, }); } diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 6b2ab85739a..52a3c1328f9 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -23,6 +23,7 @@ async function runExplicitTelegramAnnounceTurn(params: { home: string; storePath: string; deps: CliDeps; + deliveryContract?: "cron-owned" | "shared"; }): Promise>> { return runTelegramAnnounceTurn({ ...params, @@ -301,6 +302,7 @@ describe("runCronIsolatedAgentTurn", () => { home, storePath, deps, + deliveryContract: "shared", }); expectDeliveredOk(res); diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index f9a7d90a276..9da88bbb4a3 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -10,7 +10,7 @@ * returning so the timer correctly skips the system-event fallback. */ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; // --- Module mocks (must be hoisted before imports) --- @@ -105,7 +105,6 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested resolvedDelivery, deliveryRequested: overrides.deliveryRequested ?? true, skipHeartbeatDelivery: false, - skipMessagingToolDelivery: false, deliveryBestEffort: false, deliveryPayloadHasStructuredContent: false, deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [], @@ -134,6 +133,10 @@ describe("dispatchCronDelivery — double-announce guard", () => { vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined); }); + afterEach(() => { + vi.unstubAllEnvs(); + }); + it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => { // countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return vi.mocked(countActiveDescendantRuns).mockReturnValue(2); @@ -255,6 +258,42 @@ describe("dispatchCronDelivery — double-announce guard", () => { expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); }); + it("retries transient direct announce failures before succeeding", async () => { + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads) + .mockRejectedValueOnce(new Error("ECONNRESET while sending")) + .mockResolvedValueOnce([{ ok: true } as never]); + + const params = makeBaseParams({ synthesizedText: "Retry me once." }); + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.deliveryAttempted).toBe(true); + expect(state.delivered).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2); + }); + + it("does not retry permanent direct announce failures", async () => { + vi.stubEnv("OPENCLAW_TEST_FAST", "1"); + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(deliverOutboundPayloads).mockRejectedValue(new Error("chat not found")); + + const params = makeBaseParams({ synthesizedText: "This should fail once." }); + const state = await dispatchCronDelivery(params); + + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(state.result).toEqual( + expect.objectContaining({ + status: "error", + error: "Error: chat not found", + deliveryAttempted: true, + }), + ); + }); + it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => { const params = makeBaseParams({ synthesizedText: "Task done.", diff --git a/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts index 6de82039241..c5d7ec9b41c 100644 --- a/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.named-agent.test.ts @@ -96,4 +96,13 @@ describe("resolveCronDeliveryBestEffort", () => { } as never; expect(resolveCronDeliveryBestEffort(job)).toBe(true); }); + + it("lets explicit delivery.bestEffort=false override legacy payload bestEffortDeliver=true", async () => { + const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js"); + const job = { + delivery: { bestEffort: false }, + payload: { kind: "agentTurn", bestEffortDeliver: true }, + } as never; + expect(resolveCronDeliveryBestEffort(job)).toBe(false); + }); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index a3a98b245d0..fa9a295a777 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -83,7 +83,7 @@ type DispatchCronDeliveryParams = { resolvedDelivery: DeliveryTargetResolution; deliveryRequested: boolean; skipHeartbeatDelivery: boolean; - skipMessagingToolDelivery: boolean; + skipMessagingToolDelivery?: boolean; deliveryBestEffort: boolean; deliveryPayloadHasStructuredContent: boolean; deliveryPayloads: ReplyPayload[]; @@ -192,15 +192,17 @@ async function retryTransientDirectCronDelivery(params: { export async function dispatchCronDelivery( params: DispatchCronDeliveryParams, ): Promise { + const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true; let summary = params.summary; let outputText = params.outputText; let synthesizedText = params.synthesizedText; let deliveryPayloads = params.deliveryPayloads; - // `true` means we confirmed at least one outbound send reached the target. - // Keep this strict so timer fallback can safely decide whether to wake main. - let delivered = params.skipMessagingToolDelivery; - let deliveryAttempted = params.skipMessagingToolDelivery; + // Shared callers can treat a matching message-tool send as the completed + // delivery path. Cron-owned callers keep this false so direct cron delivery + // remains the only source of delivered state. + let delivered = skipMessagingToolDelivery; + let deliveryAttempted = skipMessagingToolDelivery; const failDeliveryTarget = (error: string) => params.withRunSession({ status: "error", @@ -404,11 +406,7 @@ export async function dispatchCronDelivery( } }; - if ( - params.deliveryRequested && - !params.skipHeartbeatDelivery && - !params.skipMessagingToolDelivery - ) { + if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) { if (!params.resolvedDelivery.ok) { if (!params.deliveryBestEffort) { return { diff --git a/src/cron/isolated-agent/run.message-tool-policy.test.ts b/src/cron/isolated-agent/run.message-tool-policy.test.ts index 360f0794616..2d576900b9d 100644 --- a/src/cron/isolated-agent/run.message-tool-policy.test.ts +++ b/src/cron/isolated-agent/run.message-tool-policy.test.ts @@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { restoreFastTestEnv(previousFastTestEnv); }); - it('keeps the message tool enabled when delivery.mode is "none"', async () => { + it('disables the message tool when delivery.mode is "none"', async () => { mockFallbackPassthrough(); resolveCronDeliveryPlanMock.mockReturnValue({ requested: false, @@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { await runCronIsolatedAgentTurn(makeParams()); expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true); }); it("disables the message tool when cron delivery is active", async () => { @@ -82,4 +82,20 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true); }); + + it("keeps the message tool enabled for shared callers when delivery is not requested", async () => { + mockFallbackPassthrough(); + resolveCronDeliveryPlanMock.mockReturnValue({ + requested: false, + mode: "none", + }); + + await runCronIsolatedAgentTurn({ + ...makeParams(), + deliveryContract: "shared", + }); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false); + }); }); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 813b99c0553..5b665b6bf8f 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -78,11 +78,10 @@ export type RunCronAgentTurnResult = { /** Last non-empty agent text output (not truncated). */ outputText?: string; /** - * `true` when the isolated run already delivered its output to the target - * channel (via outbound payloads, the subagent announce flow, or a matching - * messaging-tool send). Callers should skip posting a summary to the main - * session to avoid duplicate - * messages. See: https://github.com/openclaw/openclaw/issues/15692 + * `true` when the isolated runner already handled the run's user-visible + * delivery outcome. Cron-owned callers use this for cron delivery or + * explicit suppression; shared callers may also use it for a matching + * message-tool send that already reached the target. */ delivered?: boolean; /** @@ -144,16 +143,22 @@ function buildCronAgentDefaultsConfig(params: { type ResolvedCronDeliveryTarget = Awaited>; +type IsolatedDeliveryContract = "cron-owned" | "shared"; + function resolveCronToolPolicy(params: { deliveryRequested: boolean; resolvedDelivery: ResolvedCronDeliveryTarget; + deliveryContract: IsolatedDeliveryContract; }) { return { // Only enforce an explicit message target when the cron delivery target // was successfully resolved. When resolution fails the agent should not // be blocked by a target it cannot satisfy (#27898). requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok, - disableMessageTool: params.deliveryRequested, + // Cron-owned runs always route user-facing delivery through the runner + // itself. Shared callers keep the previous behavior so non-cron paths do + // not silently lose the message tool when no explicit delivery is active. + disableMessageTool: params.deliveryContract === "cron-owned" ? true : params.deliveryRequested, }; } @@ -161,6 +166,7 @@ async function resolveCronDeliveryContext(params: { cfg: OpenClawConfig; job: CronJob; agentId: string; + deliveryContract: IsolatedDeliveryContract; }) { const deliveryPlan = resolveCronDeliveryPlan(params.job); const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, { @@ -176,6 +182,7 @@ async function resolveCronDeliveryContext(params: { toolPolicy: resolveCronToolPolicy({ deliveryRequested: deliveryPlan.requested, resolvedDelivery, + deliveryContract: params.deliveryContract, }), }; } @@ -200,6 +207,7 @@ export async function runCronIsolatedAgentTurn(params: { sessionKey: string; agentId?: string; lane?: string; + deliveryContract?: IsolatedDeliveryContract; }): Promise { const abortSignal = params.abortSignal ?? params.signal; const isAborted = () => abortSignal?.aborted === true; @@ -210,6 +218,7 @@ export async function runCronIsolatedAgentTurn(params: { : "cron: job execution timed out"; }; const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1"; + const deliveryContract = params.deliveryContract ?? "cron-owned"; const defaultAgentId = resolveDefaultAgentId(params.cfg); const requestedAgentId = typeof params.agentId === "string" && params.agentId.trim() @@ -425,6 +434,7 @@ export async function runCronIsolatedAgentTurn(params: { cfg: cfgWithAgentDefaults, job: params.job, agentId, + deliveryContract, }); const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now); @@ -807,6 +817,7 @@ export async function runCronIsolatedAgentTurn(params: { const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg); const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars); const skipMessagingToolDelivery = + deliveryContract === "shared" && deliveryRequested && finalRunResult.didSendViaMessagingTool === true && (finalRunResult.messagingToolSentTargets ?? []).some((target) => @@ -816,7 +827,6 @@ export async function runCronIsolatedAgentTurn(params: { accountId: resolvedDelivery.accountId, }), ); - const deliveryResult = await dispatchCronDelivery({ cfg: params.cfg, cfgWithAgentDefaults, diff --git a/src/cron/service.delivery-plan.test.ts b/src/cron/service.delivery-plan.test.ts index 46c240e6c0f..5168d8bebc9 100644 --- a/src/cron/service.delivery-plan.test.ts +++ b/src/cron/service.delivery-plan.test.ts @@ -86,7 +86,7 @@ describe("CronService delivery plan consistency", () => { }); }); - it("treats delivery object without mode as announce", async () => { + it("treats delivery object without mode as announce without reviving legacy relay fallback", async () => { await withCronService({}, async ({ cron, enqueueSystemEvent }) => { const job = await addIsolatedAgentTurnJob(cron, { name: "partial-delivery", @@ -96,10 +96,8 @@ describe("CronService delivery plan consistency", () => { const result = await cron.run(job.id, "force"); expect(result).toEqual({ ok: true, ran: true }); - expect(enqueueSystemEvent).toHaveBeenCalledWith( - "Cron: done", - expect.objectContaining({ agentId: undefined }), - ); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(cron.getJob(job.id)?.state.lastDeliveryStatus).toBe("unknown"); }); }); diff --git a/src/cron/service.heartbeat-ok-summary-suppressed.test.ts b/src/cron/service.heartbeat-ok-summary-suppressed.test.ts index 3ae9fc7c758..d2a620e1439 100644 --- a/src/cron/service.heartbeat-ok-summary-suppressed.test.ts +++ b/src/cron/service.heartbeat-ok-summary-suppressed.test.ts @@ -86,7 +86,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => { expect(requestHeartbeatNow).not.toHaveBeenCalled(); }); - it("still enqueues real cron summaries as system events", async () => { + it("does not revive legacy main-session relay for real cron summaries", async () => { const { storePath } = await makeStorePath(); const now = Date.now(); @@ -109,10 +109,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => { await runScheduledCron(cron); - // Real summaries SHOULD be enqueued. - expect(enqueueSystemEvent).toHaveBeenCalledWith( - expect.stringContaining("Weather update"), - expect.objectContaining({ agentId: undefined }), - ); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); }); }); diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index deac4a5b668..555750bd738 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -620,14 +620,14 @@ describe("CronService", () => { await stopCronAndCleanup(cron, store); }); - it("runs an isolated job and posts summary to main", async () => { + it("runs an isolated job without posting a fallback summary to main", async () => { const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" })); const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = await createIsolatedAnnounceHarness(runIsolatedAgentJob); await runIsolatedAnnounceScenario({ cron, events, name: "weekly" }); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); - expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done"); - expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); await stopCronAndCleanup(cron, store); }); @@ -685,7 +685,7 @@ describe("CronService", () => { await stopCronAndCleanup(cron, store); }); - it("posts last output to main even when isolated job errors", async () => { + it("does not post a fallback main summary when an isolated job errors", async () => { const runIsolatedAgentJob = vi.fn(async () => ({ status: "error" as const, summary: "last output", @@ -700,8 +700,8 @@ describe("CronService", () => { status: "error", }); - expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output"); - expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); await stopCronAndCleanup(cron, store); }); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 2c40ac50643..d1d36e48e08 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -1,161 +1,10 @@ import fs from "node:fs"; -import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js"; -import { parseAbsoluteTimeMs } from "../parse.js"; -import { migrateLegacyCronPayload } from "../payload-migration.js"; -import { coerceFiniteScheduleNumber } from "../schedule.js"; -import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js"; +import { normalizeStoredCronJobs } from "../store-migration.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { CronJob } from "../types.js"; import { recomputeNextRuns } from "./jobs.js"; -import { inferLegacyName, normalizeOptionalText } from "./normalize.js"; import type { CronServiceState } from "./state.js"; -function normalizePayloadKind(payload: Record) { - const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : ""; - if (raw === "agentturn") { - payload.kind = "agentTurn"; - return true; - } - if (raw === "systemevent") { - payload.kind = "systemEvent"; - return true; - } - return false; -} - -function inferPayloadIfMissing(raw: Record) { - const message = typeof raw.message === "string" ? raw.message.trim() : ""; - const text = typeof raw.text === "string" ? raw.text.trim() : ""; - const command = typeof raw.command === "string" ? raw.command.trim() : ""; - if (message) { - raw.payload = { kind: "agentTurn", message }; - return true; - } - if (text) { - raw.payload = { kind: "systemEvent", text }; - return true; - } - if (command) { - raw.payload = { kind: "systemEvent", text: command }; - return true; - } - return false; -} - -function copyTopLevelAgentTurnFields( - raw: Record, - payload: Record, -) { - let mutated = false; - - const copyTrimmedString = (field: "model" | "thinking") => { - const existing = payload[field]; - if (typeof existing === "string" && existing.trim()) { - return; - } - const value = raw[field]; - if (typeof value === "string" && value.trim()) { - payload[field] = value.trim(); - mutated = true; - } - }; - copyTrimmedString("model"); - copyTrimmedString("thinking"); - - if ( - typeof payload.timeoutSeconds !== "number" && - typeof raw.timeoutSeconds === "number" && - Number.isFinite(raw.timeoutSeconds) - ) { - payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds)); - mutated = true; - } - - if ( - typeof payload.allowUnsafeExternalContent !== "boolean" && - typeof raw.allowUnsafeExternalContent === "boolean" - ) { - payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent; - mutated = true; - } - - if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") { - payload.deliver = raw.deliver; - mutated = true; - } - if ( - typeof payload.channel !== "string" && - typeof raw.channel === "string" && - raw.channel.trim() - ) { - payload.channel = raw.channel.trim(); - mutated = true; - } - if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) { - payload.to = raw.to.trim(); - mutated = true; - } - if ( - typeof payload.bestEffortDeliver !== "boolean" && - typeof raw.bestEffortDeliver === "boolean" - ) { - payload.bestEffortDeliver = raw.bestEffortDeliver; - mutated = true; - } - if ( - typeof payload.provider !== "string" && - typeof raw.provider === "string" && - raw.provider.trim() - ) { - payload.provider = raw.provider.trim(); - mutated = true; - } - - return mutated; -} - -function stripLegacyTopLevelFields(raw: Record) { - if ("model" in raw) { - delete raw.model; - } - if ("thinking" in raw) { - delete raw.thinking; - } - if ("timeoutSeconds" in raw) { - delete raw.timeoutSeconds; - } - if ("allowUnsafeExternalContent" in raw) { - delete raw.allowUnsafeExternalContent; - } - if ("message" in raw) { - delete raw.message; - } - if ("text" in raw) { - delete raw.text; - } - if ("deliver" in raw) { - delete raw.deliver; - } - if ("channel" in raw) { - delete raw.channel; - } - if ("to" in raw) { - delete raw.to; - } - if ("bestEffortDeliver" in raw) { - delete raw.bestEffortDeliver; - } - if ("provider" in raw) { - delete raw.provider; - } - if ("command" in raw) { - delete raw.command; - } - if ("timeout" in raw) { - delete raw.timeout; - } -} - async function getFileMtimeMs(path: string): Promise { try { const stats = await fs.promises.stat(path); @@ -185,287 +34,7 @@ export async function ensureLoaded( const fileMtimeMs = await getFileMtimeMs(state.deps.storePath); const loaded = await loadCronStore(state.deps.storePath); const jobs = (loaded.jobs ?? []) as unknown as Array>; - let mutated = false; - for (const raw of jobs) { - const state = raw.state; - if (!state || typeof state !== "object" || Array.isArray(state)) { - raw.state = {}; - mutated = true; - } - - const rawId = typeof raw.id === "string" ? raw.id.trim() : ""; - const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : ""; - if (!rawId && legacyJobId) { - raw.id = legacyJobId; - mutated = true; - } else if (rawId && raw.id !== rawId) { - raw.id = rawId; - mutated = true; - } - if ("jobId" in raw) { - delete raw.jobId; - mutated = true; - } - - if (typeof raw.schedule === "string") { - const expr = raw.schedule.trim(); - raw.schedule = { kind: "cron", expr }; - mutated = true; - } - - const nameRaw = raw.name; - if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) { - raw.name = inferLegacyName({ - schedule: raw.schedule as never, - payload: raw.payload as never, - }); - mutated = true; - } else { - raw.name = nameRaw.trim(); - } - - const desc = normalizeOptionalText(raw.description); - if (raw.description !== desc) { - raw.description = desc; - mutated = true; - } - - if ("sessionKey" in raw) { - const sessionKey = - typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined; - if (raw.sessionKey !== sessionKey) { - raw.sessionKey = sessionKey; - mutated = true; - } - } - - if (typeof raw.enabled !== "boolean") { - raw.enabled = true; - mutated = true; - } - - const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : ""; - if (wakeModeRaw === "next-heartbeat") { - if (raw.wakeMode !== "next-heartbeat") { - raw.wakeMode = "next-heartbeat"; - mutated = true; - } - } else if (wakeModeRaw === "now") { - if (raw.wakeMode !== "now") { - raw.wakeMode = "now"; - mutated = true; - } - } else { - raw.wakeMode = "now"; - mutated = true; - } - - const payload = raw.payload; - if ( - (!payload || typeof payload !== "object" || Array.isArray(payload)) && - inferPayloadIfMissing(raw) - ) { - mutated = true; - } - - const payloadRecord = - raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload) - ? (raw.payload as Record) - : null; - - if (payloadRecord) { - if (normalizePayloadKind(payloadRecord)) { - mutated = true; - } - if (!payloadRecord.kind) { - if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) { - payloadRecord.kind = "agentTurn"; - mutated = true; - } else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) { - payloadRecord.kind = "systemEvent"; - mutated = true; - } - } - if (payloadRecord.kind === "agentTurn") { - if (copyTopLevelAgentTurnFields(raw, payloadRecord)) { - mutated = true; - } - } - } - - const hadLegacyTopLevelFields = - "model" in raw || - "thinking" in raw || - "timeoutSeconds" in raw || - "allowUnsafeExternalContent" in raw || - "message" in raw || - "text" in raw || - "deliver" in raw || - "channel" in raw || - "to" in raw || - "bestEffortDeliver" in raw || - "provider" in raw || - "command" in raw || - "timeout" in raw; - if (hadLegacyTopLevelFields) { - stripLegacyTopLevelFields(raw); - mutated = true; - } - - if (payloadRecord) { - if (migrateLegacyCronPayload(payloadRecord)) { - mutated = true; - } - } - - const schedule = raw.schedule; - if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) { - const sched = schedule as Record; - const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : ""; - if (!kind && ("at" in sched || "atMs" in sched)) { - sched.kind = "at"; - mutated = true; - } - const atRaw = typeof sched.at === "string" ? sched.at.trim() : ""; - const atMsRaw = sched.atMs; - const parsedAtMs = - typeof atMsRaw === "number" - ? atMsRaw - : typeof atMsRaw === "string" - ? parseAbsoluteTimeMs(atMsRaw) - : atRaw - ? parseAbsoluteTimeMs(atRaw) - : null; - if (parsedAtMs !== null) { - sched.at = new Date(parsedAtMs).toISOString(); - if ("atMs" in sched) { - delete sched.atMs; - } - mutated = true; - } - - const everyMsRaw = sched.everyMs; - const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw); - const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null; - if (everyMs !== null && everyMsRaw !== everyMs) { - sched.everyMs = everyMs; - mutated = true; - } - if ((kind === "every" || sched.kind === "every") && everyMs !== null) { - const anchorRaw = sched.anchorMs; - const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw); - const normalizedAnchor = - anchorCoerced !== undefined - ? Math.max(0, Math.floor(anchorCoerced)) - : typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs) - ? Math.max(0, Math.floor(raw.createdAtMs)) - : typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs) - ? Math.max(0, Math.floor(raw.updatedAtMs)) - : null; - if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) { - sched.anchorMs = normalizedAnchor; - mutated = true; - } - } - - const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : ""; - const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : ""; - let normalizedExpr = exprRaw; - if (!normalizedExpr && legacyCronRaw) { - normalizedExpr = legacyCronRaw; - sched.expr = normalizedExpr; - mutated = true; - } - if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) { - sched.expr = normalizedExpr; - mutated = true; - } - if ("cron" in sched) { - delete sched.cron; - mutated = true; - } - if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) { - const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs); - const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr); - const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs; - if (targetStaggerMs === undefined) { - if ("staggerMs" in sched) { - delete sched.staggerMs; - mutated = true; - } - } else if (sched.staggerMs !== targetStaggerMs) { - sched.staggerMs = targetStaggerMs; - mutated = true; - } - } - } - - const delivery = raw.delivery; - if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) { - const modeRaw = (delivery as { mode?: unknown }).mode; - if (typeof modeRaw === "string") { - const lowered = modeRaw.trim().toLowerCase(); - if (lowered === "deliver") { - (delivery as { mode?: unknown }).mode = "announce"; - mutated = true; - } - } else if (modeRaw === undefined || modeRaw === null) { - // Explicitly persist the default so existing jobs don't silently - // change behaviour when the runtime default shifts. - (delivery as { mode?: unknown }).mode = "announce"; - mutated = true; - } - } - - const isolation = raw.isolation; - if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) { - delete raw.isolation; - mutated = true; - } - - const payloadKind = - payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : ""; - const normalizedSessionTarget = - typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : ""; - if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") { - if (raw.sessionTarget !== normalizedSessionTarget) { - raw.sessionTarget = normalizedSessionTarget; - mutated = true; - } - } else { - const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main"; - if (raw.sessionTarget !== inferredSessionTarget) { - raw.sessionTarget = inferredSessionTarget; - mutated = true; - } - } - - const sessionTarget = - typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : ""; - const isIsolatedAgentTurn = - sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn"); - const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery); - const normalizedLegacy = normalizeLegacyDeliveryInput({ - delivery: hasDelivery ? (delivery as Record) : null, - payload: payloadRecord, - }); - - if (isIsolatedAgentTurn && payloadKind === "agentTurn") { - if (!hasDelivery && normalizedLegacy.delivery) { - raw.delivery = normalizedLegacy.delivery; - mutated = true; - } else if (!hasDelivery) { - raw.delivery = { mode: "announce" }; - mutated = true; - } else if (normalizedLegacy.mutated && normalizedLegacy.delivery) { - raw.delivery = normalizedLegacy.delivery; - mutated = true; - } - } else if (normalizedLegacy.mutated && normalizedLegacy.delivery) { - raw.delivery = normalizedLegacy.delivery; - mutated = true; - } - } + const { mutated } = normalizeStoredCronJobs(jobs); state.store = { version: 1, jobs: jobs as unknown as CronJob[] }; state.storeLoadedAtMs = state.deps.nowMs(); state.storeFileMtimeMs = fileMtimeMs; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index f82290006b4..5320ffdf526 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,9 +1,7 @@ import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; -import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; -import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { CronDeliveryStatus, @@ -1138,46 +1136,6 @@ export async function executeJobCore( return { status: "error", error: timeoutErrorMessage() }; } - // Post a short summary back to the main session only when announce - // delivery was requested and we are confident no outbound delivery path - // ran. If delivery was attempted but final ack is uncertain, suppress the - // main summary to avoid duplicate user-facing sends. - // See: https://github.com/openclaw/openclaw/issues/15692 - // - // Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these - // are internal ack tokens that should never leak into user conversations. - // See: https://github.com/openclaw/openclaw/issues/32013 - const summaryText = res.summary?.trim(); - const deliveryPlan = resolveCronDeliveryPlan(job); - const suppressMainSummary = - res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested; - if ( - shouldEnqueueCronMainSummary({ - summaryText, - deliveryRequested: deliveryPlan.requested, - delivered: res.delivered, - deliveryAttempted: res.deliveryAttempted, - suppressMainSummary, - isCronSystemEvent, - }) - ) { - const prefix = "Cron"; - const label = - res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; - state.deps.enqueueSystemEvent(label, { - agentId: job.agentId, - sessionKey: job.sessionKey, - contextKey: `cron:${job.id}`, - }); - if (job.wakeMode === "now") { - state.deps.requestHeartbeatNow({ - reason: `cron:${job.id}`, - agentId: job.agentId, - sessionKey: job.sessionKey, - }); - } - } - return { status: res.status, error: res.error, diff --git a/src/cron/store-migration.test.ts b/src/cron/store-migration.test.ts new file mode 100644 index 00000000000..79f3314c019 --- /dev/null +++ b/src/cron/store-migration.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it } from "vitest"; +import { normalizeStoredCronJobs } from "./store-migration.js"; + +describe("normalizeStoredCronJobs", () => { + it("normalizes legacy cron fields and reports migration issues", () => { + const jobs = [ + { + jobId: "legacy-job", + schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" }, + message: "say hi", + model: "openai/gpt-4.1", + deliver: true, + provider: " TeLeGrAm ", + to: "12345", + }, + ] as Array>; + + const result = normalizeStoredCronJobs(jobs); + + expect(result.mutated).toBe(true); + expect(result.issues).toMatchObject({ + jobId: 1, + legacyScheduleCron: 1, + legacyTopLevelPayloadFields: 1, + legacyTopLevelDeliveryFields: 1, + }); + + const [job] = jobs; + expect(job?.jobId).toBeUndefined(); + expect(job?.id).toBe("legacy-job"); + expect(job?.schedule).toMatchObject({ + kind: "cron", + expr: "*/5 * * * *", + tz: "UTC", + }); + expect(job?.message).toBeUndefined(); + expect(job?.provider).toBeUndefined(); + expect(job?.delivery).toMatchObject({ + mode: "announce", + channel: "telegram", + to: "12345", + }); + expect(job?.payload).toMatchObject({ + kind: "agentTurn", + message: "say hi", + model: "openai/gpt-4.1", + }); + }); + + it("normalizes payload provider alias into channel", () => { + const jobs = [ + { + id: "legacy-provider", + schedule: { kind: "every", everyMs: 60_000 }, + payload: { + kind: "agentTurn", + message: "ping", + provider: " Slack ", + }, + }, + ] as Array>; + + const result = normalizeStoredCronJobs(jobs); + + expect(result.mutated).toBe(true); + expect(result.issues.legacyPayloadProvider).toBe(1); + expect(jobs[0]?.payload).toMatchObject({ + kind: "agentTurn", + message: "ping", + }); + const payload = jobs[0]?.payload as Record | undefined; + expect(payload?.provider).toBeUndefined(); + expect(jobs[0]?.delivery).toMatchObject({ + mode: "announce", + channel: "slack", + }); + }); +}); diff --git a/src/cron/store-migration.ts b/src/cron/store-migration.ts new file mode 100644 index 00000000000..11789422e61 --- /dev/null +++ b/src/cron/store-migration.ts @@ -0,0 +1,491 @@ +import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js"; +import { parseAbsoluteTimeMs } from "./parse.js"; +import { migrateLegacyCronPayload } from "./payload-migration.js"; +import { coerceFiniteScheduleNumber } from "./schedule.js"; +import { inferLegacyName, normalizeOptionalText } from "./service/normalize.js"; +import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js"; + +type CronStoreIssueKey = + | "jobId" + | "legacyScheduleString" + | "legacyScheduleCron" + | "legacyPayloadKind" + | "legacyPayloadProvider" + | "legacyTopLevelPayloadFields" + | "legacyTopLevelDeliveryFields" + | "legacyDeliveryMode"; + +type CronStoreIssues = Partial>; + +type NormalizeCronStoreJobsResult = { + issues: CronStoreIssues; + jobs: Array>; + mutated: boolean; +}; + +function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) { + issues[key] = (issues[key] ?? 0) + 1; +} + +function normalizePayloadKind(payload: Record) { + const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : ""; + if (raw === "agentturn") { + payload.kind = "agentTurn"; + return true; + } + if (raw === "systemevent") { + payload.kind = "systemEvent"; + return true; + } + return false; +} + +function inferPayloadIfMissing(raw: Record) { + const message = typeof raw.message === "string" ? raw.message.trim() : ""; + const text = typeof raw.text === "string" ? raw.text.trim() : ""; + const command = typeof raw.command === "string" ? raw.command.trim() : ""; + if (message) { + raw.payload = { kind: "agentTurn", message }; + return true; + } + if (text) { + raw.payload = { kind: "systemEvent", text }; + return true; + } + if (command) { + raw.payload = { kind: "systemEvent", text: command }; + return true; + } + return false; +} + +function copyTopLevelAgentTurnFields( + raw: Record, + payload: Record, +) { + let mutated = false; + + const copyTrimmedString = (field: "model" | "thinking") => { + const existing = payload[field]; + if (typeof existing === "string" && existing.trim()) { + return; + } + const value = raw[field]; + if (typeof value === "string" && value.trim()) { + payload[field] = value.trim(); + mutated = true; + } + }; + copyTrimmedString("model"); + copyTrimmedString("thinking"); + + if ( + typeof payload.timeoutSeconds !== "number" && + typeof raw.timeoutSeconds === "number" && + Number.isFinite(raw.timeoutSeconds) + ) { + payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds)); + mutated = true; + } + + if ( + typeof payload.allowUnsafeExternalContent !== "boolean" && + typeof raw.allowUnsafeExternalContent === "boolean" + ) { + payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent; + mutated = true; + } + + if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") { + payload.deliver = raw.deliver; + mutated = true; + } + if ( + typeof payload.channel !== "string" && + typeof raw.channel === "string" && + raw.channel.trim() + ) { + payload.channel = raw.channel.trim(); + mutated = true; + } + if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) { + payload.to = raw.to.trim(); + mutated = true; + } + if ( + typeof payload.bestEffortDeliver !== "boolean" && + typeof raw.bestEffortDeliver === "boolean" + ) { + payload.bestEffortDeliver = raw.bestEffortDeliver; + mutated = true; + } + if ( + typeof payload.provider !== "string" && + typeof raw.provider === "string" && + raw.provider.trim() + ) { + payload.provider = raw.provider.trim(); + mutated = true; + } + + return mutated; +} + +function stripLegacyTopLevelFields(raw: Record) { + if ("model" in raw) { + delete raw.model; + } + if ("thinking" in raw) { + delete raw.thinking; + } + if ("timeoutSeconds" in raw) { + delete raw.timeoutSeconds; + } + if ("allowUnsafeExternalContent" in raw) { + delete raw.allowUnsafeExternalContent; + } + if ("message" in raw) { + delete raw.message; + } + if ("text" in raw) { + delete raw.text; + } + if ("deliver" in raw) { + delete raw.deliver; + } + if ("channel" in raw) { + delete raw.channel; + } + if ("to" in raw) { + delete raw.to; + } + if ("bestEffortDeliver" in raw) { + delete raw.bestEffortDeliver; + } + if ("provider" in raw) { + delete raw.provider; + } + if ("command" in raw) { + delete raw.command; + } + if ("timeout" in raw) { + delete raw.timeout; + } +} + +export function normalizeStoredCronJobs( + jobs: Array>, +): NormalizeCronStoreJobsResult { + const issues: CronStoreIssues = {}; + let mutated = false; + + for (const raw of jobs) { + const jobIssues = new Set(); + const trackIssue = (key: CronStoreIssueKey) => { + if (jobIssues.has(key)) { + return; + } + jobIssues.add(key); + incrementIssue(issues, key); + }; + + const state = raw.state; + if (!state || typeof state !== "object" || Array.isArray(state)) { + raw.state = {}; + mutated = true; + } + + const rawId = typeof raw.id === "string" ? raw.id.trim() : ""; + const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : ""; + if (!rawId && legacyJobId) { + raw.id = legacyJobId; + mutated = true; + trackIssue("jobId"); + } else if (rawId && raw.id !== rawId) { + raw.id = rawId; + mutated = true; + } + if ("jobId" in raw) { + delete raw.jobId; + mutated = true; + trackIssue("jobId"); + } + + if (typeof raw.schedule === "string") { + const expr = raw.schedule.trim(); + raw.schedule = { kind: "cron", expr }; + mutated = true; + trackIssue("legacyScheduleString"); + } + + const nameRaw = raw.name; + if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) { + raw.name = inferLegacyName({ + schedule: raw.schedule as never, + payload: raw.payload as never, + }); + mutated = true; + } else { + raw.name = nameRaw.trim(); + } + + const desc = normalizeOptionalText(raw.description); + if (raw.description !== desc) { + raw.description = desc; + mutated = true; + } + + if ("sessionKey" in raw) { + const sessionKey = + typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined; + if (raw.sessionKey !== sessionKey) { + raw.sessionKey = sessionKey; + mutated = true; + } + } + + if (typeof raw.enabled !== "boolean") { + raw.enabled = true; + mutated = true; + } + + const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : ""; + if (wakeModeRaw === "next-heartbeat") { + if (raw.wakeMode !== "next-heartbeat") { + raw.wakeMode = "next-heartbeat"; + mutated = true; + } + } else if (wakeModeRaw === "now") { + if (raw.wakeMode !== "now") { + raw.wakeMode = "now"; + mutated = true; + } + } else { + raw.wakeMode = "now"; + mutated = true; + } + + const payload = raw.payload; + if ( + (!payload || typeof payload !== "object" || Array.isArray(payload)) && + inferPayloadIfMissing(raw) + ) { + mutated = true; + trackIssue("legacyTopLevelPayloadFields"); + } + + const payloadRecord = + raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload) + ? (raw.payload as Record) + : null; + + if (payloadRecord) { + if (normalizePayloadKind(payloadRecord)) { + mutated = true; + trackIssue("legacyPayloadKind"); + } + if (!payloadRecord.kind) { + if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) { + payloadRecord.kind = "agentTurn"; + mutated = true; + trackIssue("legacyPayloadKind"); + } else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) { + payloadRecord.kind = "systemEvent"; + mutated = true; + trackIssue("legacyPayloadKind"); + } + } + if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) { + mutated = true; + } + } + + const hadLegacyTopLevelPayloadFields = + "model" in raw || + "thinking" in raw || + "timeoutSeconds" in raw || + "allowUnsafeExternalContent" in raw || + "message" in raw || + "text" in raw || + "command" in raw || + "timeout" in raw; + const hadLegacyTopLevelDeliveryFields = + "deliver" in raw || + "channel" in raw || + "to" in raw || + "bestEffortDeliver" in raw || + "provider" in raw; + if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) { + stripLegacyTopLevelFields(raw); + mutated = true; + if (hadLegacyTopLevelPayloadFields) { + trackIssue("legacyTopLevelPayloadFields"); + } + if (hadLegacyTopLevelDeliveryFields) { + trackIssue("legacyTopLevelDeliveryFields"); + } + } + + if (payloadRecord) { + const hadLegacyPayloadProvider = + typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0; + if (migrateLegacyCronPayload(payloadRecord)) { + mutated = true; + if (hadLegacyPayloadProvider) { + trackIssue("legacyPayloadProvider"); + } + } + } + + const schedule = raw.schedule; + if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) { + const sched = schedule as Record; + const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : ""; + if (!kind && ("at" in sched || "atMs" in sched)) { + sched.kind = "at"; + mutated = true; + } + const atRaw = typeof sched.at === "string" ? sched.at.trim() : ""; + const atMsRaw = sched.atMs; + const parsedAtMs = + typeof atMsRaw === "number" + ? atMsRaw + : typeof atMsRaw === "string" + ? parseAbsoluteTimeMs(atMsRaw) + : atRaw + ? parseAbsoluteTimeMs(atRaw) + : null; + if (parsedAtMs !== null) { + sched.at = new Date(parsedAtMs).toISOString(); + if ("atMs" in sched) { + delete sched.atMs; + } + mutated = true; + } + + const everyMsRaw = sched.everyMs; + const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw); + const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null; + if (everyMs !== null && everyMsRaw !== everyMs) { + sched.everyMs = everyMs; + mutated = true; + } + if ((kind === "every" || sched.kind === "every") && everyMs !== null) { + const anchorRaw = sched.anchorMs; + const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw); + const normalizedAnchor = + anchorCoerced !== undefined + ? Math.max(0, Math.floor(anchorCoerced)) + : typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs) + ? Math.max(0, Math.floor(raw.createdAtMs)) + : typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs) + ? Math.max(0, Math.floor(raw.updatedAtMs)) + : null; + if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) { + sched.anchorMs = normalizedAnchor; + mutated = true; + } + } + + const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : ""; + const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : ""; + let normalizedExpr = exprRaw; + if (!normalizedExpr && legacyCronRaw) { + normalizedExpr = legacyCronRaw; + sched.expr = normalizedExpr; + mutated = true; + trackIssue("legacyScheduleCron"); + } + if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) { + sched.expr = normalizedExpr; + mutated = true; + } + if ("cron" in sched) { + delete sched.cron; + mutated = true; + trackIssue("legacyScheduleCron"); + } + if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) { + const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs); + const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr); + const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs; + if (targetStaggerMs === undefined) { + if ("staggerMs" in sched) { + delete sched.staggerMs; + mutated = true; + } + } else if (sched.staggerMs !== targetStaggerMs) { + sched.staggerMs = targetStaggerMs; + mutated = true; + } + } + } + + const delivery = raw.delivery; + if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) { + const modeRaw = (delivery as { mode?: unknown }).mode; + if (typeof modeRaw === "string") { + const lowered = modeRaw.trim().toLowerCase(); + if (lowered === "deliver") { + (delivery as { mode?: unknown }).mode = "announce"; + mutated = true; + trackIssue("legacyDeliveryMode"); + } + } else if (modeRaw === undefined || modeRaw === null) { + (delivery as { mode?: unknown }).mode = "announce"; + mutated = true; + } + } + + const isolation = raw.isolation; + if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) { + delete raw.isolation; + mutated = true; + } + + const payloadKind = + payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : ""; + const normalizedSessionTarget = + typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : ""; + if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") { + if (raw.sessionTarget !== normalizedSessionTarget) { + raw.sessionTarget = normalizedSessionTarget; + mutated = true; + } + } else { + const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main"; + if (raw.sessionTarget !== inferredSessionTarget) { + raw.sessionTarget = inferredSessionTarget; + mutated = true; + } + } + + const sessionTarget = + typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : ""; + const isIsolatedAgentTurn = + sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn"); + const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery); + const normalizedLegacy = normalizeLegacyDeliveryInput({ + delivery: hasDelivery ? (delivery as Record) : null, + payload: payloadRecord, + }); + + if (isIsolatedAgentTurn && payloadKind === "agentTurn") { + if (!hasDelivery && normalizedLegacy.delivery) { + raw.delivery = normalizedLegacy.delivery; + mutated = true; + } else if (!hasDelivery) { + raw.delivery = { mode: "announce" }; + mutated = true; + } else if (normalizedLegacy.mutated && normalizedLegacy.delivery) { + raw.delivery = normalizedLegacy.delivery; + mutated = true; + } + } else if (normalizedLegacy.mutated && normalizedLegacy.delivery) { + raw.delivery = normalizedLegacy.delivery; + mutated = true; + } + } + + return { issues, jobs, mutated }; +} diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index ccaf5441237..2590f63c23d 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -848,6 +848,32 @@ describe("gateway server cron", () => { 'Cron job "failure destination webhook" failed: unknown error', ); + fetchWithSsrFGuardMock.mockClear(); + cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" }); + const bestEffortFailureDestJobId = await addWebhookCronJob({ + ws, + name: "best effort failure destination webhook", + sessionTarget: "isolated", + delivery: { + mode: "announce", + channel: "telegram", + to: "19098680", + bestEffort: true, + failureDestination: { + mode: "webhook", + to: "https://example.invalid/failure-destination", + }, + }, + }); + const bestEffortFailureDestFinished = waitForCronEvent( + ws, + (payload) => + payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished", + ); + await runCronJobForce(ws, bestEffortFailureDestJobId); + await bestEffortFailureDestFinished; + expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled(); + cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" }); const noSummaryJobId = await addWebhookCronJob({ ws, @@ -861,7 +887,7 @@ describe("gateway server cron", () => { ); await runCronJobForce(ws, noSummaryJobId); await noSummaryFinished; - expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1); + expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled(); } finally { await cleanupCronTestRun({ ws, server, prevSkipCron }); } diff --git a/src/gateway/server.hooks.test.ts b/src/gateway/server.hooks.test.ts index 6711671e4ee..2a4e1c961a0 100644 --- a/src/gateway/server.hooks.test.ts +++ b/src/gateway/server.hooks.test.ts @@ -75,6 +75,10 @@ describe("gateway server hooks", () => { expect(resAgent.status).toBe(200); const agentEvents = await waitForSystemEvent(); expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true); + const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as { + deliveryContract?: string; + }; + expect(firstCall?.deliveryContract).toBe("shared"); drainSystemEvents(resolveMainKey()); mockIsolatedRunOkOnce(); diff --git a/src/gateway/server/hooks.ts b/src/gateway/server/hooks.ts index 3b294be8fb9..3b159c680af 100644 --- a/src/gateway/server/hooks.ts +++ b/src/gateway/server/hooks.ts @@ -76,6 +76,7 @@ export function createGatewayHooksRequestHandler(params: { message: value.message, sessionKey, lane: "cron", + deliveryContract: "shared", }); const summary = result.summary?.trim() || result.error?.trim() || result.status; const prefix =