Merge branch 'main' into vincentkoc-code/internal-hook-reply-surfaces

This commit is contained in:
Vincent Koc
2026-03-09 12:19:02 -07:00
committed by GitHub
28 changed files with 1276 additions and 515 deletions

View File

@@ -8,6 +8,8 @@ Docs: https://docs.openclaw.ai
### Breaking
- Cron/doctor: tighten isolated cron delivery so cron jobs can no longer notify through ad hoc agent sends or fallback main-session summaries, and add `openclaw doctor --fix` migration for legacy cron storage and legacy notify/webhook delivery metadata. (#40998) Thanks @mbelinky.
### Fixes
- macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes.

View File

@@ -29,6 +29,7 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting)
- Wakeups are first-class: a job can request “wake now” vs “next heartbeat”.
- Webhook posting is per job via `delivery.mode = "webhook"` + `delivery.to = "<url>"`.
- Legacy fallback remains for stored jobs with `notify: true` when `cron.webhook` is set, migrate those jobs to webhook delivery mode.
- For upgrades, `openclaw doctor --fix` can normalize legacy cron store fields before the scheduler touches them.
## Quick start (actionable)

View File

@@ -30,6 +30,12 @@ Note: retention/pruning is controlled in config:
- `cron.sessionRetention` (default `24h`) prunes completed isolated run sessions.
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl`.
Upgrade note: if you have older cron jobs from before the current delivery/store format, run
`openclaw doctor --fix`. Doctor now normalizes legacy cron fields (`jobId`, `schedule.cron`,
top-level delivery fields, payload `provider` delivery aliases) and migrates simple
`notify: true` webhook fallback jobs to explicit webhook delivery when `cron.webhook` is
configured.
## Common edits
Update delivery settings without changing the message:

View File

@@ -28,6 +28,7 @@ Notes:
- Interactive prompts (like keychain/OAuth fixes) only run when stdin is a TTY and `--non-interactive` is **not** set. Headless runs (cron, Telegram, no terminal) will skip prompts.
- `--fix` (alias for `--repair`) writes a backup to `~/.openclaw/openclaw.json.bak` and drops unknown config keys, listing each removal.
- State integrity checks now detect orphan transcript files in the sessions directory and can archive them as `.deleted.<timestamp>` to reclaim space safely.
- Doctor also scans `~/.openclaw/cron/jobs.json` (or `cron.store`) for legacy cron job shapes and can rewrite them in place before the scheduler has to auto-normalize them at runtime.
- Doctor includes a memory-search readiness check and can recommend `openclaw configure --section model` when embedding credentials are missing.
- If sandbox mode is enabled but Docker is unavailable, doctor reports a high-signal warning with remediation (`install Docker` or `openclaw config set agents.defaults.sandbox.mode off`).

View File

@@ -65,6 +65,7 @@ cat ~/.openclaw/openclaw.json
- Config normalization for legacy values.
- OpenCode Zen provider override warnings (`models.providers.opencode`).
- Legacy on-disk state migration (sessions/agent dir/WhatsApp auth).
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs).
- State integrity and permissions checks (sessions, transcripts, state dir).
- Config file permission checks (chmod 600) when running locally.
- Model auth health: checks OAuth expiry, can refresh expiring tokens, and reports auth-profile cooldown/disabled states.
@@ -158,6 +159,25 @@ the legacy sessions + agent dir on startup so history/auth/models land in the
per-agent path without a manual doctor run. WhatsApp auth is intentionally only
migrated via `openclaw doctor`.
### 3b) Legacy cron store migrations
Doctor also checks the cron job store (`~/.openclaw/cron/jobs.json` by default,
or `cron.store` when overridden) for old job shapes that the scheduler still
accepts for compatibility.
Current cron cleanups include:
- `jobId``id`
- `schedule.cron``schedule.expr`
- top-level payload fields (`message`, `model`, `thinking`, ...) → `payload`
- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery`
- payload `provider` delivery aliases → explicit `delivery.channel`
- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook`
Doctor only auto-migrates `notify: true` jobs when it can do so without
changing behavior. If a job combines legacy notify fallback with an existing
non-webhook delivery mode, doctor warns and leaves that job for manual review.
### 4) State integrity checks (session persistence, routing, and safety)
The state directory is the operational brainstem. If it vanishes, you lose

View File

@@ -16,7 +16,7 @@ export function registerCronCli(program: Command) {
.addHelpText(
"after",
() =>
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n`,
`\n${theme.muted("Docs:")} ${formatDocsLink("/cli/cron", "docs.openclaw.ai/cli/cron")}\n${theme.muted("Upgrade tip:")} run \`openclaw doctor --fix\` to normalize legacy cron job storage.\n`,
);
registerCronStatusCommand(cron);

View File

@@ -0,0 +1,158 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import * as noteModule from "../terminal/note.js";
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
let tempRoot: string | null = null;
async function makeTempStorePath() {
tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-"));
return path.join(tempRoot, "cron", "jobs.json");
}
afterEach(async () => {
vi.restoreAllMocks();
if (tempRoot) {
await fs.rm(tempRoot, { recursive: true, force: true });
tempRoot = null;
}
});
function makePrompter(confirmResult = true) {
return {
confirm: vi.fn().mockResolvedValue(confirmResult),
};
}
describe("maybeRepairLegacyCronStore", () => {
it("repairs legacy cron store fields and migrates notify fallback to webhook delivery", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
jobId: "legacy-job",
name: "Legacy job",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "cron", cron: "0 7 * * *", tz: "UTC" },
payload: {
kind: "systemEvent",
text: "Morning brief",
},
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const cfg: OpenClawConfig = {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
};
await maybeRepairLegacyCronStore({
cfg,
options: {},
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
const [job] = persisted.jobs;
expect(job?.jobId).toBeUndefined();
expect(job?.id).toBe("legacy-job");
expect(job?.notify).toBeUndefined();
expect(job?.schedule).toMatchObject({
kind: "cron",
expr: "0 7 * * *",
tz: "UTC",
});
expect(job?.delivery).toMatchObject({
mode: "webhook",
to: "https://example.invalid/cron-finished",
});
expect(job?.payload).toMatchObject({
kind: "systemEvent",
text: "Morning brief",
});
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining("Legacy cron job storage detected"),
"Cron",
);
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining("Cron store normalized"),
"Doctor changes",
);
});
it("warns instead of replacing announce delivery for notify fallback jobs", async () => {
const storePath = await makeTempStorePath();
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "notify-and-announce",
name: "Notify and announce",
notify: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
updatedAtMs: Date.parse("2026-02-02T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "Status" },
delivery: { mode: "announce", channel: "telegram", to: "123" },
state: {},
},
],
},
null,
2,
),
"utf-8",
);
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
await maybeRepairLegacyCronStore({
cfg: {
cron: {
store: storePath,
webhook: "https://example.invalid/cron-finished",
},
},
options: { nonInteractive: true },
prompter: makePrompter(true),
});
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as {
jobs: Array<Record<string, unknown>>;
};
expect(persisted.jobs[0]?.notify).toBe(true);
expect(noteSpy).toHaveBeenCalledWith(
expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'),
"Doctor warnings",
);
});
});

186
src/commands/doctor-cron.ts Normal file
View File

@@ -0,0 +1,186 @@
import { formatCliCommand } from "../cli/command-format.js";
import type { OpenClawConfig } from "../config/config.js";
import { normalizeStoredCronJobs } from "../cron/store-migration.js";
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
import { note } from "../terminal/note.js";
import { shortenHomePath } from "../utils.js";
import type { DoctorPrompter, DoctorOptions } from "./doctor-prompter.js";
type CronDoctorOutcome = {
changed: boolean;
warnings: string[];
};
function pluralize(count: number, noun: string) {
return `${count} ${noun}${count === 1 ? "" : "s"}`;
}
function formatLegacyIssuePreview(issues: Partial<Record<string, number>>): string[] {
const lines: string[] = [];
if (issues.jobId) {
lines.push(`- ${pluralize(issues.jobId, "job")} still uses legacy \`jobId\``);
}
if (issues.legacyScheduleString) {
lines.push(
`- ${pluralize(issues.legacyScheduleString, "job")} stores schedule as a bare string`,
);
}
if (issues.legacyScheduleCron) {
lines.push(`- ${pluralize(issues.legacyScheduleCron, "job")} still uses \`schedule.cron\``);
}
if (issues.legacyPayloadKind) {
lines.push(`- ${pluralize(issues.legacyPayloadKind, "job")} needs payload kind normalization`);
}
if (issues.legacyPayloadProvider) {
lines.push(
`- ${pluralize(issues.legacyPayloadProvider, "job")} still uses payload \`provider\` as a delivery alias`,
);
}
if (issues.legacyTopLevelPayloadFields) {
lines.push(
`- ${pluralize(issues.legacyTopLevelPayloadFields, "job")} still uses top-level payload fields`,
);
}
if (issues.legacyTopLevelDeliveryFields) {
lines.push(
`- ${pluralize(issues.legacyTopLevelDeliveryFields, "job")} still uses top-level delivery fields`,
);
}
if (issues.legacyDeliveryMode) {
lines.push(
`- ${pluralize(issues.legacyDeliveryMode, "job")} still uses delivery mode \`deliver\``,
);
}
return lines;
}
function trimString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function migrateLegacyNotifyFallback(params: {
jobs: Array<Record<string, unknown>>;
legacyWebhook?: string;
}): CronDoctorOutcome {
let changed = false;
const warnings: string[] = [];
for (const raw of params.jobs) {
if (!("notify" in raw)) {
continue;
}
const jobName = trimString(raw.name) ?? trimString(raw.id) ?? "<unnamed>";
const notify = raw.notify === true;
if (!notify) {
delete raw.notify;
changed = true;
continue;
}
const delivery =
raw.delivery && typeof raw.delivery === "object" && !Array.isArray(raw.delivery)
? (raw.delivery as Record<string, unknown>)
: null;
const mode = trimString(delivery?.mode)?.toLowerCase();
const to = trimString(delivery?.to);
if (mode === "webhook" && to) {
delete raw.notify;
changed = true;
continue;
}
if ((mode === undefined || mode === "none" || mode === "webhook") && params.legacyWebhook) {
raw.delivery = {
...delivery,
mode: "webhook",
to: to ?? params.legacyWebhook,
};
delete raw.notify;
changed = true;
continue;
}
if (!params.legacyWebhook) {
warnings.push(
`Cron job "${jobName}" still uses legacy notify fallback, but cron.webhook is unset so doctor cannot migrate it automatically.`,
);
continue;
}
warnings.push(
`Cron job "${jobName}" uses legacy notify fallback alongside delivery mode "${mode}". Migrate it manually so webhook delivery does not replace existing announce behavior.`,
);
}
return { changed, warnings };
}
export async function maybeRepairLegacyCronStore(params: {
cfg: OpenClawConfig;
options: DoctorOptions;
prompter: Pick<DoctorPrompter, "confirm">;
}) {
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const store = await loadCronStore(storePath);
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
if (rawJobs.length === 0) {
return;
}
const normalized = normalizeStoredCronJobs(rawJobs);
const legacyWebhook = trimString(params.cfg.cron?.webhook);
const notifyCount = rawJobs.filter((job) => job.notify === true).length;
const previewLines = formatLegacyIssuePreview(normalized.issues);
if (notifyCount > 0) {
previewLines.push(
`- ${pluralize(notifyCount, "job")} still uses legacy \`notify: true\` webhook fallback`,
);
}
if (previewLines.length === 0) {
return;
}
note(
[
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
...previewLines,
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`,
].join("\n"),
"Cron",
);
const shouldRepair =
params.options.nonInteractive === true
? true
: await params.prompter.confirm({
message: "Repair legacy cron jobs now?",
initialValue: true,
});
if (!shouldRepair) {
return;
}
const notifyMigration = migrateLegacyNotifyFallback({
jobs: rawJobs,
legacyWebhook,
});
const changed = normalized.mutated || notifyMigration.changed;
if (!changed && notifyMigration.warnings.length === 0) {
return;
}
if (changed) {
await saveCronStore(storePath, {
version: 1,
jobs: rawJobs as unknown as CronJob[],
});
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
}
if (notifyMigration.warnings.length > 0) {
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
}
}

View File

@@ -31,6 +31,7 @@ import {
import { noteBootstrapFileSize } from "./doctor-bootstrap-size.js";
import { doctorShellCompletion } from "./doctor-completion.js";
import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js";
import { maybeRepairLegacyCronStore } from "./doctor-cron.js";
import { maybeRepairGatewayDaemon } from "./doctor-gateway-daemon-flow.js";
import { checkGatewayHealth, probeGatewayMemoryStatus } from "./doctor-gateway-health.js";
import {
@@ -220,6 +221,11 @@ export async function doctorCommand(
await noteStateIntegrity(cfg, prompter, configResult.path ?? CONFIG_PATH);
await noteSessionLockHealth({ shouldRepair: prompter.shouldRepair });
await maybeRepairLegacyCronStore({
cfg,
options,
prompter,
});
cfg = await maybeRepairSandboxImages(cfg, runtime, prompter);
noteSandboxScopeWarnings(cfg);

View File

@@ -0,0 +1,143 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
resolveDeliveryTarget: vi.fn(),
deliverOutboundPayloads: vi.fn(),
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({ kind: "identity" }),
buildOutboundSessionContext: vi.fn().mockReturnValue({ kind: "session" }),
createOutboundSendDeps: vi.fn().mockReturnValue({ kind: "deps" }),
warn: vi.fn(),
}));
vi.mock("./isolated-agent/delivery-target.js", () => ({
resolveDeliveryTarget: mocks.resolveDeliveryTarget,
}));
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/identity.js", () => ({
resolveAgentOutboundIdentity: mocks.resolveAgentOutboundIdentity,
}));
vi.mock("../infra/outbound/session-context.js", () => ({
buildOutboundSessionContext: mocks.buildOutboundSessionContext,
}));
vi.mock("../cli/outbound-send-deps.js", () => ({
createOutboundSendDeps: mocks.createOutboundSendDeps,
}));
vi.mock("../logging.js", () => ({
getChildLogger: vi.fn(() => ({
warn: mocks.warn,
})),
}));
const { sendFailureNotificationAnnounce } = await import("./delivery.js");
describe("sendFailureNotificationAnnounce", () => {
beforeEach(() => {
vi.clearAllMocks();
mocks.resolveDeliveryTarget.mockResolvedValue({
ok: true,
channel: "telegram",
to: "123",
accountId: "bot-a",
threadId: 42,
mode: "explicit",
});
mocks.deliverOutboundPayloads.mockResolvedValue([{ ok: true }]);
});
afterEach(() => {
vi.useRealTimers();
});
it("delivers failure alerts to the resolved explicit target with strict send settings", async () => {
const deps = {} as never;
const cfg = {} as never;
await sendFailureNotificationAnnounce(
deps,
cfg,
"main",
"job-1",
{ channel: "telegram", to: "123", accountId: "bot-a" },
"Cron failed",
);
expect(mocks.resolveDeliveryTarget).toHaveBeenCalledWith(cfg, "main", {
channel: "telegram",
to: "123",
accountId: "bot-a",
});
expect(mocks.buildOutboundSessionContext).toHaveBeenCalledWith({
cfg,
agentId: "main",
sessionKey: "cron:job-1:failure",
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
cfg,
channel: "telegram",
to: "123",
accountId: "bot-a",
threadId: 42,
payloads: [{ text: "Cron failed" }],
session: { kind: "session" },
identity: { kind: "identity" },
bestEffort: false,
deps: { kind: "deps" },
abortSignal: expect.any(AbortSignal),
}),
);
});
it("does not send when target resolution fails", async () => {
mocks.resolveDeliveryTarget.mockResolvedValue({
ok: false,
error: new Error("target missing"),
});
await sendFailureNotificationAnnounce(
{} as never,
{} as never,
"main",
"job-1",
{ channel: "telegram", to: "123" },
"Cron failed",
);
expect(mocks.deliverOutboundPayloads).not.toHaveBeenCalled();
expect(mocks.warn).toHaveBeenCalledWith(
{ error: "target missing" },
"cron: failed to resolve failure destination target",
);
});
it("swallows outbound delivery errors after logging", async () => {
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("send failed"));
await expect(
sendFailureNotificationAnnounce(
{} as never,
{} as never,
"main",
"job-1",
{ channel: "telegram", to: "123" },
"Cron failed",
),
).resolves.toBeUndefined();
expect(mocks.warn).toHaveBeenCalledWith(
expect.objectContaining({
err: "send failed",
channel: "telegram",
to: "123",
}),
"cron: failure destination announce failed",
);
});
});

View File

@@ -148,6 +148,46 @@ describe("resolveFailureDestination", () => {
expect(plan).toBeNull();
});
it("returns null when webhook failure destination matches the primary webhook target", () => {
const plan = resolveFailureDestination(
makeJob({
sessionTarget: "main",
payload: { kind: "systemEvent", text: "tick" },
delivery: {
mode: "webhook",
to: "https://example.invalid/cron",
failureDestination: {
mode: "webhook",
to: "https://example.invalid/cron",
},
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("does not reuse inherited announce recipient when switching failure destination to webhook", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: {
mode: "webhook",
},
},
}),
{
channel: "signal",
to: "group-abc",
mode: "announce",
},
);
expect(plan).toBeNull();
});
it("allows job-level failure destination fields to clear inherited global values", () => {
const plan = resolveFailureDestination(
makeJob({

View File

@@ -54,6 +54,7 @@ export async function runTelegramAnnounceTurn(params: {
to?: string;
bestEffort?: boolean;
};
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath, {
@@ -67,5 +68,6 @@ export async function runTelegramAnnounceTurn(params: {
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
deliveryContract: params.deliveryContract,
});
}

View File

@@ -23,6 +23,7 @@ async function runExplicitTelegramAnnounceTurn(params: {
home: string;
storePath: string;
deps: CliDeps;
deliveryContract?: "cron-owned" | "shared";
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runTelegramAnnounceTurn({
...params,
@@ -301,6 +302,7 @@ describe("runCronIsolatedAgentTurn", () => {
home,
storePath,
deps,
deliveryContract: "shared",
});
expectDeliveredOk(res);

View File

@@ -10,7 +10,7 @@
* returning so the timer correctly skips the system-event fallback.
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must be hoisted before imports) ---
@@ -105,7 +105,6 @@ function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested
resolvedDelivery,
deliveryRequested: overrides.deliveryRequested ?? true,
skipHeartbeatDelivery: false,
skipMessagingToolDelivery: false,
deliveryBestEffort: false,
deliveryPayloadHasStructuredContent: false,
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
@@ -134,6 +133,10 @@ describe("dispatchCronDelivery — double-announce guard", () => {
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
});
afterEach(() => {
vi.unstubAllEnvs();
});
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
@@ -255,6 +258,42 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("retries transient direct announce failures before succeeding", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads)
.mockRejectedValueOnce(new Error("ECONNRESET while sending"))
.mockResolvedValueOnce([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Retry me once." });
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
});
it("does not retry permanent direct announce failures", async () => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockRejectedValue(new Error("chat not found"));
const params = makeBaseParams({ synthesizedText: "This should fail once." });
const state = await dispatchCronDelivery(params);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(state.result).toEqual(
expect.objectContaining({
status: "error",
error: "Error: chat not found",
deliveryAttempted: true,
}),
);
});
it("no delivery requested means deliveryAttempted stays false and no delivery is sent", async () => {
const params = makeBaseParams({
synthesizedText: "Task done.",

View File

@@ -96,4 +96,13 @@ describe("resolveCronDeliveryBestEffort", () => {
} as never;
expect(resolveCronDeliveryBestEffort(job)).toBe(true);
});
it("lets explicit delivery.bestEffort=false override legacy payload bestEffortDeliver=true", async () => {
const { resolveCronDeliveryBestEffort } = await import("./delivery-dispatch.js");
const job = {
delivery: { bestEffort: false },
payload: { kind: "agentTurn", bestEffortDeliver: true },
} as never;
expect(resolveCronDeliveryBestEffort(job)).toBe(false);
});
});

View File

@@ -83,7 +83,7 @@ type DispatchCronDeliveryParams = {
resolvedDelivery: DeliveryTargetResolution;
deliveryRequested: boolean;
skipHeartbeatDelivery: boolean;
skipMessagingToolDelivery: boolean;
skipMessagingToolDelivery?: boolean;
deliveryBestEffort: boolean;
deliveryPayloadHasStructuredContent: boolean;
deliveryPayloads: ReplyPayload[];
@@ -192,15 +192,17 @@ async function retryTransientDirectCronDelivery<T>(params: {
export async function dispatchCronDelivery(
params: DispatchCronDeliveryParams,
): Promise<DispatchCronDeliveryState> {
const skipMessagingToolDelivery = params.skipMessagingToolDelivery === true;
let summary = params.summary;
let outputText = params.outputText;
let synthesizedText = params.synthesizedText;
let deliveryPayloads = params.deliveryPayloads;
// `true` means we confirmed at least one outbound send reached the target.
// Keep this strict so timer fallback can safely decide whether to wake main.
let delivered = params.skipMessagingToolDelivery;
let deliveryAttempted = params.skipMessagingToolDelivery;
// Shared callers can treat a matching message-tool send as the completed
// delivery path. Cron-owned callers keep this false so direct cron delivery
// remains the only source of delivered state.
let delivered = skipMessagingToolDelivery;
let deliveryAttempted = skipMessagingToolDelivery;
const failDeliveryTarget = (error: string) =>
params.withRunSession({
status: "error",
@@ -404,11 +406,7 @@ export async function dispatchCronDelivery(
}
};
if (
params.deliveryRequested &&
!params.skipHeartbeatDelivery &&
!params.skipMessagingToolDelivery
) {
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
if (!params.resolvedDelivery.ok) {
if (!params.deliveryBestEffort) {
return {

View File

@@ -55,7 +55,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
restoreFastTestEnv(previousFastTestEnv);
});
it('keeps the message tool enabled when delivery.mode is "none"', async () => {
it('disables the message tool when delivery.mode is "none"', async () => {
mockFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
@@ -65,7 +65,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
await runCronIsolatedAgentTurn(makeParams());
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
});
it("disables the message tool when cron delivery is active", async () => {
@@ -82,4 +82,20 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(true);
});
it("keeps the message tool enabled for shared callers when delivery is not requested", async () => {
mockFallbackPassthrough();
resolveCronDeliveryPlanMock.mockReturnValue({
requested: false,
mode: "none",
});
await runCronIsolatedAgentTurn({
...makeParams(),
deliveryContract: "shared",
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.disableMessageTool).toBe(false);
});
});

View File

@@ -78,11 +78,10 @@ export type RunCronAgentTurnResult = {
/** Last non-empty agent text output (not truncated). */
outputText?: string;
/**
* `true` when the isolated run already delivered its output to the target
* channel (via outbound payloads, the subagent announce flow, or a matching
* messaging-tool send). Callers should skip posting a summary to the main
* session to avoid duplicate
* messages. See: https://github.com/openclaw/openclaw/issues/15692
* `true` when the isolated runner already handled the run's user-visible
* delivery outcome. Cron-owned callers use this for cron delivery or
* explicit suppression; shared callers may also use it for a matching
* message-tool send that already reached the target.
*/
delivered?: boolean;
/**
@@ -144,16 +143,22 @@ function buildCronAgentDefaultsConfig(params: {
type ResolvedCronDeliveryTarget = Awaited<ReturnType<typeof resolveDeliveryTarget>>;
type IsolatedDeliveryContract = "cron-owned" | "shared";
function resolveCronToolPolicy(params: {
deliveryRequested: boolean;
resolvedDelivery: ResolvedCronDeliveryTarget;
deliveryContract: IsolatedDeliveryContract;
}) {
return {
// Only enforce an explicit message target when the cron delivery target
// was successfully resolved. When resolution fails the agent should not
// be blocked by a target it cannot satisfy (#27898).
requireExplicitMessageTarget: params.deliveryRequested && params.resolvedDelivery.ok,
disableMessageTool: params.deliveryRequested,
// Cron-owned runs always route user-facing delivery through the runner
// itself. Shared callers keep the previous behavior so non-cron paths do
// not silently lose the message tool when no explicit delivery is active.
disableMessageTool: params.deliveryContract === "cron-owned" ? true : params.deliveryRequested,
};
}
@@ -161,6 +166,7 @@ async function resolveCronDeliveryContext(params: {
cfg: OpenClawConfig;
job: CronJob;
agentId: string;
deliveryContract: IsolatedDeliveryContract;
}) {
const deliveryPlan = resolveCronDeliveryPlan(params.job);
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
@@ -176,6 +182,7 @@ async function resolveCronDeliveryContext(params: {
toolPolicy: resolveCronToolPolicy({
deliveryRequested: deliveryPlan.requested,
resolvedDelivery,
deliveryContract: params.deliveryContract,
}),
};
}
@@ -200,6 +207,7 @@ export async function runCronIsolatedAgentTurn(params: {
sessionKey: string;
agentId?: string;
lane?: string;
deliveryContract?: IsolatedDeliveryContract;
}): Promise<RunCronAgentTurnResult> {
const abortSignal = params.abortSignal ?? params.signal;
const isAborted = () => abortSignal?.aborted === true;
@@ -210,6 +218,7 @@ export async function runCronIsolatedAgentTurn(params: {
: "cron: job execution timed out";
};
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
const deliveryContract = params.deliveryContract ?? "cron-owned";
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const requestedAgentId =
typeof params.agentId === "string" && params.agentId.trim()
@@ -425,6 +434,7 @@ export async function runCronIsolatedAgentTurn(params: {
cfg: cfgWithAgentDefaults,
job: params.job,
agentId,
deliveryContract,
});
const { formattedTime, timeLine } = resolveCronStyleNow(params.cfg, now);
@@ -807,6 +817,7 @@ export async function runCronIsolatedAgentTurn(params: {
const ackMaxChars = resolveHeartbeatAckMaxChars(agentCfg);
const skipHeartbeatDelivery = deliveryRequested && isHeartbeatOnlyResponse(payloads, ackMaxChars);
const skipMessagingToolDelivery =
deliveryContract === "shared" &&
deliveryRequested &&
finalRunResult.didSendViaMessagingTool === true &&
(finalRunResult.messagingToolSentTargets ?? []).some((target) =>
@@ -816,7 +827,6 @@ export async function runCronIsolatedAgentTurn(params: {
accountId: resolvedDelivery.accountId,
}),
);
const deliveryResult = await dispatchCronDelivery({
cfg: params.cfg,
cfgWithAgentDefaults,

View File

@@ -86,7 +86,7 @@ describe("CronService delivery plan consistency", () => {
});
});
it("treats delivery object without mode as announce", async () => {
it("treats delivery object without mode as announce without reviving legacy relay fallback", async () => {
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
const job = await addIsolatedAgentTurnJob(cron, {
name: "partial-delivery",
@@ -96,10 +96,8 @@ describe("CronService delivery plan consistency", () => {
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron: done",
expect.objectContaining({ agentId: undefined }),
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(cron.getJob(job.id)?.state.lastDeliveryStatus).toBe("unknown");
});
});

View File

@@ -86,7 +86,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
expect(requestHeartbeatNow).not.toHaveBeenCalled();
});
it("still enqueues real cron summaries as system events", async () => {
it("does not revive legacy main-session relay for real cron summaries", async () => {
const { storePath } = await makeStorePath();
const now = Date.now();
@@ -109,10 +109,7 @@ describe("cron isolated job HEARTBEAT_OK summary suppression (#32013)", () => {
await runScheduledCron(cron);
// Real summaries SHOULD be enqueued.
expect(enqueueSystemEvent).toHaveBeenCalledWith(
expect.stringContaining("Weather update"),
expect.objectContaining({ agentId: undefined }),
);
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
});
});

View File

@@ -620,14 +620,14 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("runs an isolated job and posts summary to main", async () => {
it("runs an isolated job without posting a fallback summary to main", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
await runIsolatedAnnounceScenario({ cron, events, name: "weekly" });
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
expectMainSystemEventPosted(enqueueSystemEvent, "Cron: done");
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
await stopCronAndCleanup(cron, store);
});
@@ -685,7 +685,7 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("posts last output to main even when isolated job errors", async () => {
it("does not post a fallback main summary when an isolated job errors", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
summary: "last output",
@@ -700,8 +700,8 @@ describe("CronService", () => {
status: "error",
});
expectMainSystemEventPosted(enqueueSystemEvent, "Cron (error): last output");
expect(requestHeartbeatNow).toHaveBeenCalled();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
await stopCronAndCleanup(cron, store);
});

View File

@@ -1,161 +1,10 @@
import fs from "node:fs";
import { normalizeLegacyDeliveryInput } from "../legacy-delivery.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import { migrateLegacyCronPayload } from "../payload-migration.js";
import { coerceFiniteScheduleNumber } from "../schedule.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
import { normalizeStoredCronJobs } from "../store-migration.js";
import { loadCronStore, saveCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { recomputeNextRuns } from "./jobs.js";
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
import type { CronServiceState } from "./state.js";
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
const command = typeof raw.command === "string" ? raw.command.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
if (command) {
raw.payload = { kind: "systemEvent", text: command };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
if ("command" in raw) {
delete raw.command;
}
if ("timeout" in raw) {
delete raw.timeout;
}
}
async function getFileMtimeMs(path: string): Promise<number | null> {
try {
const stats = await fs.promises.stat(path);
@@ -185,287 +34,7 @@ export async function ensureLoaded(
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
const loaded = await loadCronStore(state.deps.storePath);
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
let mutated = false;
for (const raw of jobs) {
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
if (!rawId && legacyJobId) {
raw.id = legacyJobId;
mutated = true;
} else if (rawId && raw.id !== rawId) {
raw.id = rawId;
mutated = true;
}
if ("jobId" in raw) {
delete raw.jobId;
mutated = true;
}
if (typeof raw.schedule === "string") {
const expr = raw.schedule.trim();
raw.schedule = { kind: "cron", expr };
mutated = true;
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
schedule: raw.schedule as never,
payload: raw.payload as never,
});
mutated = true;
} else {
raw.name = nameRaw.trim();
}
const desc = normalizeOptionalText(raw.description);
if (raw.description !== desc) {
raw.description = desc;
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;
}
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
if (wakeModeRaw === "next-heartbeat") {
if (raw.wakeMode !== "next-heartbeat") {
raw.wakeMode = "next-heartbeat";
mutated = true;
}
} else if (wakeModeRaw === "now") {
if (raw.wakeMode !== "now") {
raw.wakeMode = "now";
mutated = true;
}
} else {
raw.wakeMode = "now";
mutated = true;
}
const payload = raw.payload;
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
}
}
if (payloadRecord.kind === "agentTurn") {
if (copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
}
const hadLegacyTopLevelFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw ||
"command" in raw ||
"timeout" in raw;
if (hadLegacyTopLevelFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
}
if (payloadRecord) {
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
}
}
const schedule = raw.schedule;
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
const sched = schedule as Record<string, unknown>;
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
if (!kind && ("at" in sched || "atMs" in sched)) {
sched.kind = "at";
mutated = true;
}
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
const atMsRaw = sched.atMs;
const parsedAtMs =
typeof atMsRaw === "number"
? atMsRaw
: typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: atRaw
? parseAbsoluteTimeMs(atRaw)
: null;
if (parsedAtMs !== null) {
sched.at = new Date(parsedAtMs).toISOString();
if ("atMs" in sched) {
delete sched.atMs;
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
if (everyMs !== null && everyMsRaw !== everyMs) {
sched.everyMs = everyMs;
mutated = true;
}
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
const normalizedAnchor =
anchorCoerced !== undefined
? Math.max(0, Math.floor(anchorCoerced))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
let normalizedExpr = exprRaw;
if (!normalizedExpr && legacyCronRaw) {
normalizedExpr = legacyCronRaw;
sched.expr = normalizedExpr;
mutated = true;
}
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
sched.expr = normalizedExpr;
mutated = true;
}
if ("cron" in sched) {
delete sched.cron;
mutated = true;
}
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
if (targetStaggerMs === undefined) {
if ("staggerMs" in sched) {
delete sched.staggerMs;
mutated = true;
}
} else if (sched.staggerMs !== targetStaggerMs) {
sched.staggerMs = targetStaggerMs;
mutated = true;
}
}
}
const delivery = raw.delivery;
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
const modeRaw = (delivery as { mode?: unknown }).mode;
if (typeof modeRaw === "string") {
const lowered = modeRaw.trim().toLowerCase();
if (lowered === "deliver") {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
} else if (modeRaw === undefined || modeRaw === null) {
// Explicitly persist the default so existing jobs don't silently
// change behaviour when the runtime default shifts.
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
const isolation = raw.isolation;
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
delete raw.isolation;
mutated = true;
}
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const normalizedSessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
if (raw.sessionTarget !== normalizedSessionTarget) {
raw.sessionTarget = normalizedSessionTarget;
mutated = true;
}
} else {
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
if (raw.sessionTarget !== inferredSessionTarget) {
raw.sessionTarget = inferredSessionTarget;
mutated = true;
}
}
const sessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
const { mutated } = normalizeStoredCronJobs(jobs);
state.store = { version: 1, jobs: jobs as unknown as CronJob[] };
state.storeLoadedAtMs = state.deps.nowMs();
state.storeFileMtimeMs = fileMtimeMs;

View File

@@ -1,9 +1,7 @@
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import type {
CronDeliveryStatus,
@@ -1138,46 +1136,6 @@ export async function executeJobCore(
return { status: "error", error: timeoutErrorMessage() };
}
// Post a short summary back to the main session only when announce
// delivery was requested and we are confident no outbound delivery path
// ran. If delivery was attempted but final ack is uncertain, suppress the
// main summary to avoid duplicate user-facing sends.
// See: https://github.com/openclaw/openclaw/issues/15692
//
// Also suppress heartbeat-only summaries (e.g. "HEARTBEAT_OK") — these
// are internal ack tokens that should never leak into user conversations.
// See: https://github.com/openclaw/openclaw/issues/32013
const summaryText = res.summary?.trim();
const deliveryPlan = resolveCronDeliveryPlan(job);
const suppressMainSummary =
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
if (
shouldEnqueueCronMainSummary({
summaryText,
deliveryRequested: deliveryPlan.requested,
delivered: res.delivered,
deliveryAttempted: res.deliveryAttempted,
suppressMainSummary,
isCronSystemEvent,
})
) {
const prefix = "Cron";
const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({
reason: `cron:${job.id}`,
agentId: job.agentId,
sessionKey: job.sessionKey,
});
}
}
return {
status: res.status,
error: res.error,

View File

@@ -0,0 +1,78 @@
import { describe, expect, it } from "vitest";
import { normalizeStoredCronJobs } from "./store-migration.js";
describe("normalizeStoredCronJobs", () => {
it("normalizes legacy cron fields and reports migration issues", () => {
const jobs = [
{
jobId: "legacy-job",
schedule: { kind: "cron", cron: "*/5 * * * *", tz: "UTC" },
message: "say hi",
model: "openai/gpt-4.1",
deliver: true,
provider: " TeLeGrAm ",
to: "12345",
},
] as Array<Record<string, unknown>>;
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues).toMatchObject({
jobId: 1,
legacyScheduleCron: 1,
legacyTopLevelPayloadFields: 1,
legacyTopLevelDeliveryFields: 1,
});
const [job] = jobs;
expect(job?.jobId).toBeUndefined();
expect(job?.id).toBe("legacy-job");
expect(job?.schedule).toMatchObject({
kind: "cron",
expr: "*/5 * * * *",
tz: "UTC",
});
expect(job?.message).toBeUndefined();
expect(job?.provider).toBeUndefined();
expect(job?.delivery).toMatchObject({
mode: "announce",
channel: "telegram",
to: "12345",
});
expect(job?.payload).toMatchObject({
kind: "agentTurn",
message: "say hi",
model: "openai/gpt-4.1",
});
});
it("normalizes payload provider alias into channel", () => {
const jobs = [
{
id: "legacy-provider",
schedule: { kind: "every", everyMs: 60_000 },
payload: {
kind: "agentTurn",
message: "ping",
provider: " Slack ",
},
},
] as Array<Record<string, unknown>>;
const result = normalizeStoredCronJobs(jobs);
expect(result.mutated).toBe(true);
expect(result.issues.legacyPayloadProvider).toBe(1);
expect(jobs[0]?.payload).toMatchObject({
kind: "agentTurn",
message: "ping",
});
const payload = jobs[0]?.payload as Record<string, unknown> | undefined;
expect(payload?.provider).toBeUndefined();
expect(jobs[0]?.delivery).toMatchObject({
mode: "announce",
channel: "slack",
});
});
});

491
src/cron/store-migration.ts Normal file
View File

@@ -0,0 +1,491 @@
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
import { parseAbsoluteTimeMs } from "./parse.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import { coerceFiniteScheduleNumber } from "./schedule.js";
import { inferLegacyName, normalizeOptionalText } from "./service/normalize.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
type CronStoreIssueKey =
| "jobId"
| "legacyScheduleString"
| "legacyScheduleCron"
| "legacyPayloadKind"
| "legacyPayloadProvider"
| "legacyTopLevelPayloadFields"
| "legacyTopLevelDeliveryFields"
| "legacyDeliveryMode";
type CronStoreIssues = Partial<Record<CronStoreIssueKey, number>>;
type NormalizeCronStoreJobsResult = {
issues: CronStoreIssues;
jobs: Array<Record<string, unknown>>;
mutated: boolean;
};
function incrementIssue(issues: CronStoreIssues, key: CronStoreIssueKey) {
issues[key] = (issues[key] ?? 0) + 1;
}
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
const command = typeof raw.command === "string" ? raw.command.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
if (command) {
raw.payload = { kind: "systemEvent", text: command };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(0, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
if ("command" in raw) {
delete raw.command;
}
if ("timeout" in raw) {
delete raw.timeout;
}
}
export function normalizeStoredCronJobs(
jobs: Array<Record<string, unknown>>,
): NormalizeCronStoreJobsResult {
const issues: CronStoreIssues = {};
let mutated = false;
for (const raw of jobs) {
const jobIssues = new Set<CronStoreIssueKey>();
const trackIssue = (key: CronStoreIssueKey) => {
if (jobIssues.has(key)) {
return;
}
jobIssues.add(key);
incrementIssue(issues, key);
};
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const rawId = typeof raw.id === "string" ? raw.id.trim() : "";
const legacyJobId = typeof raw.jobId === "string" ? raw.jobId.trim() : "";
if (!rawId && legacyJobId) {
raw.id = legacyJobId;
mutated = true;
trackIssue("jobId");
} else if (rawId && raw.id !== rawId) {
raw.id = rawId;
mutated = true;
}
if ("jobId" in raw) {
delete raw.jobId;
mutated = true;
trackIssue("jobId");
}
if (typeof raw.schedule === "string") {
const expr = raw.schedule.trim();
raw.schedule = { kind: "cron", expr };
mutated = true;
trackIssue("legacyScheduleString");
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
schedule: raw.schedule as never,
payload: raw.payload as never,
});
mutated = true;
} else {
raw.name = nameRaw.trim();
}
const desc = normalizeOptionalText(raw.description);
if (raw.description !== desc) {
raw.description = desc;
mutated = true;
}
if ("sessionKey" in raw) {
const sessionKey =
typeof raw.sessionKey === "string" ? normalizeOptionalText(raw.sessionKey) : undefined;
if (raw.sessionKey !== sessionKey) {
raw.sessionKey = sessionKey;
mutated = true;
}
}
if (typeof raw.enabled !== "boolean") {
raw.enabled = true;
mutated = true;
}
const wakeModeRaw = typeof raw.wakeMode === "string" ? raw.wakeMode.trim().toLowerCase() : "";
if (wakeModeRaw === "next-heartbeat") {
if (raw.wakeMode !== "next-heartbeat") {
raw.wakeMode = "next-heartbeat";
mutated = true;
}
} else if (wakeModeRaw === "now") {
if (raw.wakeMode !== "now") {
raw.wakeMode = "now";
mutated = true;
}
} else {
raw.wakeMode = "now";
mutated = true;
}
const payload = raw.payload;
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
trackIssue("legacyTopLevelPayloadFields");
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
trackIssue("legacyPayloadKind");
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
trackIssue("legacyPayloadKind");
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
trackIssue("legacyPayloadKind");
}
}
if (payloadRecord.kind === "agentTurn" && copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
const hadLegacyTopLevelPayloadFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"command" in raw ||
"timeout" in raw;
const hadLegacyTopLevelDeliveryFields =
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw;
if (hadLegacyTopLevelPayloadFields || hadLegacyTopLevelDeliveryFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
if (hadLegacyTopLevelPayloadFields) {
trackIssue("legacyTopLevelPayloadFields");
}
if (hadLegacyTopLevelDeliveryFields) {
trackIssue("legacyTopLevelDeliveryFields");
}
}
if (payloadRecord) {
const hadLegacyPayloadProvider =
typeof payloadRecord.provider === "string" && payloadRecord.provider.trim().length > 0;
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
if (hadLegacyPayloadProvider) {
trackIssue("legacyPayloadProvider");
}
}
}
const schedule = raw.schedule;
if (schedule && typeof schedule === "object" && !Array.isArray(schedule)) {
const sched = schedule as Record<string, unknown>;
const kind = typeof sched.kind === "string" ? sched.kind.trim().toLowerCase() : "";
if (!kind && ("at" in sched || "atMs" in sched)) {
sched.kind = "at";
mutated = true;
}
const atRaw = typeof sched.at === "string" ? sched.at.trim() : "";
const atMsRaw = sched.atMs;
const parsedAtMs =
typeof atMsRaw === "number"
? atMsRaw
: typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: atRaw
? parseAbsoluteTimeMs(atRaw)
: null;
if (parsedAtMs !== null) {
sched.at = new Date(parsedAtMs).toISOString();
if ("atMs" in sched) {
delete sched.atMs;
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
if (everyMs !== null && everyMsRaw !== everyMs) {
sched.everyMs = everyMs;
mutated = true;
}
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
const normalizedAnchor =
anchorCoerced !== undefined
? Math.max(0, Math.floor(anchorCoerced))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
const legacyCronRaw = typeof sched.cron === "string" ? sched.cron.trim() : "";
let normalizedExpr = exprRaw;
if (!normalizedExpr && legacyCronRaw) {
normalizedExpr = legacyCronRaw;
sched.expr = normalizedExpr;
mutated = true;
trackIssue("legacyScheduleCron");
}
if (typeof sched.expr === "string" && sched.expr !== normalizedExpr) {
sched.expr = normalizedExpr;
mutated = true;
}
if ("cron" in sched) {
delete sched.cron;
mutated = true;
trackIssue("legacyScheduleCron");
}
if ((kind === "cron" || sched.kind === "cron") && normalizedExpr) {
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
const defaultStaggerMs = resolveDefaultCronStaggerMs(normalizedExpr);
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
if (targetStaggerMs === undefined) {
if ("staggerMs" in sched) {
delete sched.staggerMs;
mutated = true;
}
} else if (sched.staggerMs !== targetStaggerMs) {
sched.staggerMs = targetStaggerMs;
mutated = true;
}
}
}
const delivery = raw.delivery;
if (delivery && typeof delivery === "object" && !Array.isArray(delivery)) {
const modeRaw = (delivery as { mode?: unknown }).mode;
if (typeof modeRaw === "string") {
const lowered = modeRaw.trim().toLowerCase();
if (lowered === "deliver") {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
trackIssue("legacyDeliveryMode");
}
} else if (modeRaw === undefined || modeRaw === null) {
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
const isolation = raw.isolation;
if (isolation && typeof isolation === "object" && !Array.isArray(isolation)) {
delete raw.isolation;
mutated = true;
}
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const normalizedSessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
if (normalizedSessionTarget === "main" || normalizedSessionTarget === "isolated") {
if (raw.sessionTarget !== normalizedSessionTarget) {
raw.sessionTarget = normalizedSessionTarget;
mutated = true;
}
} else {
const inferredSessionTarget = payloadKind === "agentTurn" ? "isolated" : "main";
if (raw.sessionTarget !== inferredSessionTarget) {
raw.sessionTarget = inferredSessionTarget;
mutated = true;
}
}
const sessionTarget =
typeof raw.sessionTarget === "string" ? raw.sessionTarget.trim().toLowerCase() : "";
const isIsolatedAgentTurn =
sessionTarget === "isolated" || (sessionTarget === "" && payloadKind === "agentTurn");
const hasDelivery = delivery && typeof delivery === "object" && !Array.isArray(delivery);
const normalizedLegacy = normalizeLegacyDeliveryInput({
delivery: hasDelivery ? (delivery as Record<string, unknown>) : null,
payload: payloadRecord,
});
if (isIsolatedAgentTurn && payloadKind === "agentTurn") {
if (!hasDelivery && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
} else if (!hasDelivery) {
raw.delivery = { mode: "announce" };
mutated = true;
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
} else if (normalizedLegacy.mutated && normalizedLegacy.delivery) {
raw.delivery = normalizedLegacy.delivery;
mutated = true;
}
}
return { issues, jobs, mutated };
}

View File

@@ -848,6 +848,32 @@ describe("gateway server cron", () => {
'Cron job "failure destination webhook" failed: unknown error',
);
fetchWithSsrFGuardMock.mockClear();
cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "best-effort failed" });
const bestEffortFailureDestJobId = await addWebhookCronJob({
ws,
name: "best effort failure destination webhook",
sessionTarget: "isolated",
delivery: {
mode: "announce",
channel: "telegram",
to: "19098680",
bestEffort: true,
failureDestination: {
mode: "webhook",
to: "https://example.invalid/failure-destination",
},
},
});
const bestEffortFailureDestFinished = waitForCronEvent(
ws,
(payload) =>
payload?.jobId === bestEffortFailureDestJobId && payload?.action === "finished",
);
await runCronJobForce(ws, bestEffortFailureDestJobId);
await bestEffortFailureDestFinished;
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
const noSummaryJobId = await addWebhookCronJob({
ws,
@@ -861,7 +887,7 @@ describe("gateway server cron", () => {
);
await runCronJobForce(ws, noSummaryJobId);
await noSummaryFinished;
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
expect(fetchWithSsrFGuardMock).not.toHaveBeenCalled();
} finally {
await cleanupCronTestRun({ ws, server, prevSkipCron });
}

View File

@@ -75,6 +75,10 @@ describe("gateway server hooks", () => {
expect(resAgent.status).toBe(200);
const agentEvents = await waitForSystemEvent();
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
const firstCall = (cronIsolatedRun.mock.calls[0] as unknown[] | undefined)?.[0] as {
deliveryContract?: string;
};
expect(firstCall?.deliveryContract).toBe("shared");
drainSystemEvents(resolveMainKey());
mockIsolatedRunOkOnce();

View File

@@ -76,6 +76,7 @@ export function createGatewayHooksRequestHandler(params: {
message: value.message,
sessionKey,
lane: "cron",
deliveryContract: "shared",
});
const summary = result.summary?.trim() || result.error?.trim() || result.status;
const prefix =