From 1d4e4314dddc0f4b2f48b5be541ab07b6279a138 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sun, 19 Apr 2026 22:48:46 +0530 Subject: [PATCH] fix: preserve deferred cron heartbeat target (#69021) * test(cron): cover deferred heartbeat target preservation * fix(cron): preserve deferred heartbeat target override * test(cron): update timer expectation for deferred heartbeat target * fix(cron): preserve agent heartbeat config for targeted wakes * test(cron): use wake request type in scheduler helper * fix(cron): forward heartbeat overrides through gateway wake adapter * fix(cron): preserve coalesced wake heartbeat overrides * fix: preserve deferred cron heartbeat target (#69021) --- CHANGELOG.md | 1 + ...n-job-passes-heartbeat-target-last.test.ts | 43 +++++++++++++++++-- src/cron/service/state.ts | 4 +- src/cron/service/timer.test.ts | 1 + src/cron/service/timer.ts | 3 ++ src/gateway/server-cron.test.ts | 42 ++++++++++++++++++ src/gateway/server-cron.ts | 1 + src/infra/heartbeat-runner.scheduler.test.ts | 43 ++++++++++++++++++- src/infra/heartbeat-runner.ts | 9 +++- src/infra/heartbeat-wake.test.ts | 33 ++++++++++++++ src/infra/heartbeat-wake.ts | 23 ++++++++-- 11 files changed, 191 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c88f7dacd72..8c6c5fefe80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Discord/slash commands: tolerate partial Discord channel metadata in slash-command and model-picker flows so partial channel objects no longer crash when channel names, topics, or thread parent metadata are unavailable. (#68953) Thanks @dutifulbob. - BlueBubbles: consolidate outbound HTTP through a typed `BlueBubblesClient` that resolves the SSRF policy once at construction so image attachments stop getting blocked on localhost and reactions stop getting blocked on private-IP BB deployments. Fixes #34749 and #59722. (#68234) Thanks @omarshahine. - Cron/gateway: reject ambiguous announce delivery config at add/update time so invalid multi-channel or target-id provider settings fail early instead of persisting broken cron jobs. (#69015) Thanks @obviyus. +- Cron/main-session delivery: preserve `heartbeat.target="last"` through deferred wake queuing, gateway wake forwarding, and same-target wake coalescing so queued cron replies still return to the last active chat. (#69021) Thanks @obviyus. ## 2026.4.19-beta.2 diff --git a/src/cron/service.main-job-passes-heartbeat-target-last.test.ts b/src/cron/service.main-job-passes-heartbeat-target-last.test.ts index 39959f63207..f4662883b01 100644 --- a/src/cron/service.main-job-passes-heartbeat-target-last.test.ts +++ b/src/cron/service.main-job-passes-heartbeat-target-last.test.ts @@ -88,7 +88,40 @@ describe("cron main job passes heartbeat target=last", () => { expect(callArgs?.heartbeat?.target).toBe("last"); }); - it("should not pass heartbeat target for wakeMode=next-heartbeat main jobs", async () => { + it("should preserve heartbeat.target=last when wakeMode=now falls back to requestHeartbeatNow", async () => { + const { storePath } = await makeStorePath(); + const now = Date.now(); + + const job = createMainCronJob({ + now, + id: "test-main-delivery-busy", + wakeMode: "now", + }); + + await writeCronStoreSnapshot({ storePath, jobs: [job] }); + + const runHeartbeatOnce = vi.fn(async () => ({ + status: "skipped" as const, + reason: "requests-in-flight", + })); + + const { cron, requestHeartbeatNow } = createCronWithSpies({ + storePath, + runHeartbeatOnce, + }); + + await runSingleTick(cron); + + expect(runHeartbeatOnce).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalledWith( + expect.objectContaining({ + reason: "cron:test-main-delivery-busy", + heartbeat: { target: "last" }, + }), + ); + }); + + it("should preserve heartbeat.target=last for wakeMode=next-heartbeat main jobs", async () => { const { storePath } = await makeStorePath(); const now = Date.now(); @@ -112,9 +145,13 @@ describe("cron main job passes heartbeat target=last", () => { await runSingleTick(cron); - // wakeMode=next-heartbeat uses requestHeartbeatNow, not runHeartbeatOnce expect(requestHeartbeatNow).toHaveBeenCalled(); - // runHeartbeatOnce should NOT have been called for next-heartbeat mode + expect(requestHeartbeatNow).toHaveBeenCalledWith( + expect.objectContaining({ + reason: "cron:test-next-heartbeat", + heartbeat: { target: "last" }, + }), + ); expect(runHeartbeatOnce).not.toHaveBeenCalled(); }); }); diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index f57073fbf0e..a9adc290ec1 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -1,5 +1,5 @@ import type { CronConfig } from "../../config/types.cron.js"; -import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; +import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js"; import type { CronDeliveryStatus, CronJob, @@ -64,7 +64,7 @@ export type CronServiceDeps = { text: string, opts?: { agentId?: string; sessionKey?: string; contextKey?: string; trusted?: boolean }, ) => void; - requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void; + requestHeartbeatNow: (opts?: HeartbeatWakeRequest) => void; runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string; diff --git a/src/cron/service/timer.test.ts b/src/cron/service/timer.test.ts index 09824be8c7a..26033177247 100644 --- a/src/cron/service/timer.test.ts +++ b/src/cron/service/timer.test.ts @@ -65,6 +65,7 @@ describe("cron service timer seam coverage", () => { reason: "cron:main-heartbeat-job", agentId: undefined, sessionKey: "agent:main:main", + heartbeat: { target: "last" }, }); const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 80d0c51f32d..29605100ca7 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1220,6 +1220,7 @@ async function executeMainSessionCronJob( reason, agentId: job.agentId, sessionKey: targetMainSessionKey, + heartbeat: { target: "last" }, }); return { status: "ok", summary: text }; } @@ -1234,6 +1235,7 @@ async function executeMainSessionCronJob( reason, agentId: job.agentId, sessionKey: targetMainSessionKey, + heartbeat: { target: "last" }, }); return { status: "ok", summary: text }; } @@ -1256,6 +1258,7 @@ async function executeMainSessionCronJob( reason: `cron:${job.id}`, agentId: job.agentId, sessionKey: targetMainSessionKey, + heartbeat: { target: "last" }, }); return { status: "ok", summary: text }; } diff --git a/src/gateway/server-cron.test.ts b/src/gateway/server-cron.test.ts index 3b83cbd70d4..10e6c3757de 100644 --- a/src/gateway/server-cron.test.ts +++ b/src/gateway/server-cron.test.ts @@ -140,6 +140,48 @@ describe("buildGatewayCronService", () => { } }); + it("forwards heartbeat overrides through the cron wake adapter", () => { + const cfg = createCronConfig("server-cron-heartbeat-override"); + loadConfigMock.mockReturnValue(cfg); + + const state = buildGatewayCronService({ + cfg, + deps: {} as CliDeps, + broadcast: () => {}, + }); + try { + const cronDeps = ( + state.cron as unknown as { + state?: { + deps?: { + requestHeartbeatNow?: (opts?: { + agentId?: string; + sessionKey?: string | null; + reason?: string; + heartbeat?: { target?: string }; + }) => void; + }; + }; + } + ).state?.deps; + + cronDeps?.requestHeartbeatNow?.({ + reason: "cron:test", + sessionKey: "discord:channel:ops", + heartbeat: { target: "last" }, + }); + + expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ + reason: "cron:test", + agentId: "main", + sessionKey: "agent:main:discord:channel:ops", + heartbeat: { target: "last" }, + }); + } finally { + state.cron.stop(); + } + }); + it("preserves trust downgrades when cron enqueues system events", () => { const cfg = createCronConfig("server-cron-untrusted"); loadConfigMock.mockReturnValue(cfg); diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index f73289facd1..301cea1894a 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -297,6 +297,7 @@ export function buildGatewayCronService(params: { reason: opts?.reason, agentId, sessionKey, + heartbeat: opts?.heartbeat, }); }, runHeartbeatOnce: async (opts) => { diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index 4181598de35..a81453b855b 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -58,7 +58,7 @@ describe("startHeartbeatRunner", () => { async function expectWakeDispatch(params: { cfg: OpenClawConfig; runSpy: RunOnce; - wake: { reason: string; agentId?: string; sessionKey?: string; coalesceMs: number }; + wake: Parameters[0]; expectedCall: Record; }) { const runner = startHeartbeatRunner({ @@ -305,6 +305,47 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + it("merges targeted wake heartbeat overrides onto the agent heartbeat config", async () => { + useFakeHeartbeatTime(); + const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + const runner = await expectWakeDispatch({ + cfg: { + ...heartbeatConfig([ + { + id: "ops", + heartbeat: { + every: "15m", + prompt: "Ops prompt", + directPolicy: "block", + target: "discord:channel:ops", + }, + }, + ]), + } as OpenClawConfig, + runSpy, + wake: { + reason: "cron:job-123", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { target: "last" }, + coalesceMs: 0, + }, + expectedCall: { + agentId: "ops", + reason: "cron:job-123", + sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { + every: "15m", + prompt: "Ops prompt", + directPolicy: "block", + target: "last", + }, + }, + }); + + runner.stop(); + }); + it("does not fan out to unrelated agents for session-scoped exec wakes", async () => { useFakeHeartbeatTime(); const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 8259076e342..b889a95701e 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -86,6 +86,7 @@ import { areHeartbeatsEnabled, type HeartbeatRunResult, type HeartbeatWakeHandler, + type HeartbeatWakeRequest, requestHeartbeatNow, setHeartbeatsEnabled, setHeartbeatWakeHandler, @@ -1409,6 +1410,9 @@ export function startHeartbeatRunner(opts: { const reason = params?.reason; const requestedAgentId = params?.agentId ? normalizeAgentId(params.agentId) : undefined; const requestedSessionKey = normalizeOptionalString(params?.sessionKey); + const requestedHeartbeat = params?.heartbeat; + const resolveRequestedHeartbeat = (heartbeat?: HeartbeatConfig) => + requestedHeartbeat ? { ...heartbeat, ...requestedHeartbeat } : heartbeat; const isInterval = reason === "interval"; const startedAt = Date.now(); const now = startedAt; @@ -1428,7 +1432,7 @@ export function startHeartbeatRunner(opts: { const res = await runOnce({ cfg: state.cfg, agentId: targetAgent.agentId, - heartbeat: targetAgent.heartbeat, + heartbeat: resolveRequestedHeartbeat(targetAgent.heartbeat), reason, sessionKey: requestedSessionKey, deps: { runtime: state.runtime }, @@ -1496,11 +1500,12 @@ export function startHeartbeatRunner(opts: { } }; - const wakeHandler: HeartbeatWakeHandler = async (params) => + const wakeHandler: HeartbeatWakeHandler = async (params: HeartbeatWakeRequest) => run({ reason: params.reason, agentId: params.agentId, sessionKey: params.sessionKey, + heartbeat: params.heartbeat, }); const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler); updateConfig(state.cfg); diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts index 5a88210505c..3e5f75e0300 100644 --- a/src/infra/heartbeat-wake.test.ts +++ b/src/infra/heartbeat-wake.test.ts @@ -263,6 +263,7 @@ describe("heartbeat-wake", () => { reason: "cron:job-1", agentId: "ops", sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { target: "last" }, coalesceMs: 0, }); @@ -272,6 +273,7 @@ describe("heartbeat-wake", () => { reason: "cron:job-1", agentId: "ops", sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { target: "last" }, }); await vi.advanceTimersByTimeAsync(1000); @@ -280,6 +282,37 @@ describe("heartbeat-wake", () => { reason: "cron:job-1", agentId: "ops", sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { target: "last" }, + }); + }); + + it("preserves heartbeat override when same-target wakes coalesce", async () => { + vi.useFakeTimers(); + const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + setHeartbeatWakeHandler(handler); + + requestHeartbeatNow({ + reason: "manual", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { target: "last" }, + coalesceMs: 100, + }); + requestHeartbeatNow({ + reason: "manual", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + coalesceMs: 100, + }); + + await vi.advanceTimersByTimeAsync(100); + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith({ + reason: "manual", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + heartbeat: { target: "last" }, }); }); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 08affbd4947..5116e046e8f 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -10,11 +10,14 @@ export type HeartbeatRunResult = | { status: "skipped"; reason: string } | { status: "failed"; reason: string }; -export type HeartbeatWakeHandler = (opts: { +export type HeartbeatWakeRequest = { reason?: string; agentId?: string; sessionKey?: string; -}) => Promise; + heartbeat?: { target?: string }; +}; + +export type HeartbeatWakeHandler = (opts: HeartbeatWakeRequest) => Promise; let heartbeatsEnabled = true; @@ -33,6 +36,7 @@ type PendingWakeReason = { requestedAt: number; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }; let handler: HeartbeatWakeHandler | null = null; @@ -87,6 +91,7 @@ function queuePendingWakeReason(params?: { requestedAt?: number; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }) { const requestedAt = params?.requestedAt ?? Date.now(); const normalizedReason = normalizeWakeReason(params?.reason); @@ -102,18 +107,23 @@ function queuePendingWakeReason(params?: { requestedAt, agentId: normalizedAgentId, sessionKey: normalizedSessionKey, + heartbeat: params?.heartbeat, }; const previous = pendingWakes.get(wakeTargetKey); if (!previous) { pendingWakes.set(wakeTargetKey, next); return; } + const merged = + (next.heartbeat ?? previous.heartbeat) + ? { ...next, heartbeat: next.heartbeat ?? previous.heartbeat } + : next; if (next.priority > previous.priority) { - pendingWakes.set(wakeTargetKey, next); + pendingWakes.set(wakeTargetKey, merged); return; } if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) { - pendingWakes.set(wakeTargetKey, next); + pendingWakes.set(wakeTargetKey, merged); } } @@ -162,6 +172,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { reason: pendingWake.reason ?? undefined, ...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}), ...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}), + ...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}), }; const res = await active(wakeOpts); if (res.status === "skipped" && res.reason === "requests-in-flight") { @@ -170,6 +181,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { reason: pendingWake.reason ?? "retry", agentId: pendingWake.agentId, sessionKey: pendingWake.sessionKey, + heartbeat: pendingWake.heartbeat, }); schedule(DEFAULT_RETRY_MS, "retry"); } @@ -181,6 +193,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { reason: pendingWake.reason ?? "retry", agentId: pendingWake.agentId, sessionKey: pendingWake.sessionKey, + heartbeat: pendingWake.heartbeat, }); } schedule(DEFAULT_RETRY_MS, "retry"); @@ -241,11 +254,13 @@ export function requestHeartbeatNow(opts?: { coalesceMs?: number; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }) { queuePendingWakeReason({ reason: opts?.reason, agentId: opts?.agentId, sessionKey: opts?.sessionKey, + heartbeat: opts?.heartbeat, }); schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal"); }