mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:20:43 +00:00
fix: preserve deferred cron heartbeat target (#69021)
* test(cron): cover deferred heartbeat target preservation * fix(cron): preserve deferred heartbeat target override * test(cron): update timer expectation for deferred heartbeat target * fix(cron): preserve agent heartbeat config for targeted wakes * test(cron): use wake request type in scheduler helper * fix(cron): forward heartbeat overrides through gateway wake adapter * fix(cron): preserve coalesced wake heartbeat overrides * fix: preserve deferred cron heartbeat target (#69021)
This commit is contained in:
@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Discord/slash commands: tolerate partial Discord channel metadata in slash-command and model-picker flows so partial channel objects no longer crash when channel names, topics, or thread parent metadata are unavailable. (#68953) Thanks @dutifulbob.
|
||||
- BlueBubbles: consolidate outbound HTTP through a typed `BlueBubblesClient` that resolves the SSRF policy once at construction so image attachments stop getting blocked on localhost and reactions stop getting blocked on private-IP BB deployments. Fixes #34749 and #59722. (#68234) Thanks @omarshahine.
|
||||
- Cron/gateway: reject ambiguous announce delivery config at add/update time so invalid multi-channel or target-id provider settings fail early instead of persisting broken cron jobs. (#69015) Thanks @obviyus.
|
||||
- Cron/main-session delivery: preserve `heartbeat.target="last"` through deferred wake queuing, gateway wake forwarding, and same-target wake coalescing so queued cron replies still return to the last active chat. (#69021) Thanks @obviyus.
|
||||
|
||||
## 2026.4.19-beta.2
|
||||
|
||||
|
||||
@@ -88,7 +88,40 @@ describe("cron main job passes heartbeat target=last", () => {
|
||||
expect(callArgs?.heartbeat?.target).toBe("last");
|
||||
});
|
||||
|
||||
it("should not pass heartbeat target for wakeMode=next-heartbeat main jobs", async () => {
|
||||
it("should preserve heartbeat.target=last when wakeMode=now falls back to requestHeartbeatNow", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
const job = createMainCronJob({
|
||||
now,
|
||||
id: "test-main-delivery-busy",
|
||||
wakeMode: "now",
|
||||
});
|
||||
|
||||
await writeCronStoreSnapshot({ storePath, jobs: [job] });
|
||||
|
||||
const runHeartbeatOnce = vi.fn<RunHeartbeatOnce>(async () => ({
|
||||
status: "skipped" as const,
|
||||
reason: "requests-in-flight",
|
||||
}));
|
||||
|
||||
const { cron, requestHeartbeatNow } = createCronWithSpies({
|
||||
storePath,
|
||||
runHeartbeatOnce,
|
||||
});
|
||||
|
||||
await runSingleTick(cron);
|
||||
|
||||
expect(runHeartbeatOnce).toHaveBeenCalled();
|
||||
expect(requestHeartbeatNow).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
reason: "cron:test-main-delivery-busy",
|
||||
heartbeat: { target: "last" },
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("should preserve heartbeat.target=last for wakeMode=next-heartbeat main jobs", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
@@ -112,9 +145,13 @@ describe("cron main job passes heartbeat target=last", () => {
|
||||
|
||||
await runSingleTick(cron);
|
||||
|
||||
// wakeMode=next-heartbeat uses requestHeartbeatNow, not runHeartbeatOnce
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
// runHeartbeatOnce should NOT have been called for next-heartbeat mode
|
||||
expect(requestHeartbeatNow).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
reason: "cron:test-next-heartbeat",
|
||||
heartbeat: { target: "last" },
|
||||
}),
|
||||
);
|
||||
expect(runHeartbeatOnce).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { CronConfig } from "../../config/types.cron.js";
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js";
|
||||
import type {
|
||||
CronDeliveryStatus,
|
||||
CronJob,
|
||||
@@ -64,7 +64,7 @@ export type CronServiceDeps = {
|
||||
text: string,
|
||||
opts?: { agentId?: string; sessionKey?: string; contextKey?: string; trusted?: boolean },
|
||||
) => void;
|
||||
requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void;
|
||||
requestHeartbeatNow: (opts?: HeartbeatWakeRequest) => void;
|
||||
runHeartbeatOnce?: (opts?: {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
|
||||
@@ -65,6 +65,7 @@ describe("cron service timer seam coverage", () => {
|
||||
reason: "cron:main-heartbeat-job",
|
||||
agentId: undefined,
|
||||
sessionKey: "agent:main:main",
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as {
|
||||
|
||||
@@ -1220,6 +1220,7 @@ async function executeMainSessionCronJob(
|
||||
reason,
|
||||
agentId: job.agentId,
|
||||
sessionKey: targetMainSessionKey,
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
return { status: "ok", summary: text };
|
||||
}
|
||||
@@ -1234,6 +1235,7 @@ async function executeMainSessionCronJob(
|
||||
reason,
|
||||
agentId: job.agentId,
|
||||
sessionKey: targetMainSessionKey,
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
return { status: "ok", summary: text };
|
||||
}
|
||||
@@ -1256,6 +1258,7 @@ async function executeMainSessionCronJob(
|
||||
reason: `cron:${job.id}`,
|
||||
agentId: job.agentId,
|
||||
sessionKey: targetMainSessionKey,
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
return { status: "ok", summary: text };
|
||||
}
|
||||
|
||||
@@ -140,6 +140,48 @@ describe("buildGatewayCronService", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("forwards heartbeat overrides through the cron wake adapter", () => {
|
||||
const cfg = createCronConfig("server-cron-heartbeat-override");
|
||||
loadConfigMock.mockReturnValue(cfg);
|
||||
|
||||
const state = buildGatewayCronService({
|
||||
cfg,
|
||||
deps: {} as CliDeps,
|
||||
broadcast: () => {},
|
||||
});
|
||||
try {
|
||||
const cronDeps = (
|
||||
state.cron as unknown as {
|
||||
state?: {
|
||||
deps?: {
|
||||
requestHeartbeatNow?: (opts?: {
|
||||
agentId?: string;
|
||||
sessionKey?: string | null;
|
||||
reason?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) => void;
|
||||
};
|
||||
};
|
||||
}
|
||||
).state?.deps;
|
||||
|
||||
cronDeps?.requestHeartbeatNow?.({
|
||||
reason: "cron:test",
|
||||
sessionKey: "discord:channel:ops",
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({
|
||||
reason: "cron:test",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:discord:channel:ops",
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
} finally {
|
||||
state.cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("preserves trust downgrades when cron enqueues system events", () => {
|
||||
const cfg = createCronConfig("server-cron-untrusted");
|
||||
loadConfigMock.mockReturnValue(cfg);
|
||||
|
||||
@@ -297,6 +297,7 @@ export function buildGatewayCronService(params: {
|
||||
reason: opts?.reason,
|
||||
agentId,
|
||||
sessionKey,
|
||||
heartbeat: opts?.heartbeat,
|
||||
});
|
||||
},
|
||||
runHeartbeatOnce: async (opts) => {
|
||||
|
||||
@@ -58,7 +58,7 @@ describe("startHeartbeatRunner", () => {
|
||||
async function expectWakeDispatch(params: {
|
||||
cfg: OpenClawConfig;
|
||||
runSpy: RunOnce;
|
||||
wake: { reason: string; agentId?: string; sessionKey?: string; coalesceMs: number };
|
||||
wake: Parameters<typeof requestHeartbeatNow>[0];
|
||||
expectedCall: Record<string, unknown>;
|
||||
}) {
|
||||
const runner = startHeartbeatRunner({
|
||||
@@ -305,6 +305,47 @@ describe("startHeartbeatRunner", () => {
|
||||
runner.stop();
|
||||
});
|
||||
|
||||
it("merges targeted wake heartbeat overrides onto the agent heartbeat config", async () => {
|
||||
useFakeHeartbeatTime();
|
||||
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
|
||||
const runner = await expectWakeDispatch({
|
||||
cfg: {
|
||||
...heartbeatConfig([
|
||||
{
|
||||
id: "ops",
|
||||
heartbeat: {
|
||||
every: "15m",
|
||||
prompt: "Ops prompt",
|
||||
directPolicy: "block",
|
||||
target: "discord:channel:ops",
|
||||
},
|
||||
},
|
||||
]),
|
||||
} as OpenClawConfig,
|
||||
runSpy,
|
||||
wake: {
|
||||
reason: "cron:job-123",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: { target: "last" },
|
||||
coalesceMs: 0,
|
||||
},
|
||||
expectedCall: {
|
||||
agentId: "ops",
|
||||
reason: "cron:job-123",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: {
|
||||
every: "15m",
|
||||
prompt: "Ops prompt",
|
||||
directPolicy: "block",
|
||||
target: "last",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
runner.stop();
|
||||
});
|
||||
|
||||
it("does not fan out to unrelated agents for session-scoped exec wakes", async () => {
|
||||
useFakeHeartbeatTime();
|
||||
const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
|
||||
|
||||
@@ -86,6 +86,7 @@ import {
|
||||
areHeartbeatsEnabled,
|
||||
type HeartbeatRunResult,
|
||||
type HeartbeatWakeHandler,
|
||||
type HeartbeatWakeRequest,
|
||||
requestHeartbeatNow,
|
||||
setHeartbeatsEnabled,
|
||||
setHeartbeatWakeHandler,
|
||||
@@ -1409,6 +1410,9 @@ export function startHeartbeatRunner(opts: {
|
||||
const reason = params?.reason;
|
||||
const requestedAgentId = params?.agentId ? normalizeAgentId(params.agentId) : undefined;
|
||||
const requestedSessionKey = normalizeOptionalString(params?.sessionKey);
|
||||
const requestedHeartbeat = params?.heartbeat;
|
||||
const resolveRequestedHeartbeat = (heartbeat?: HeartbeatConfig) =>
|
||||
requestedHeartbeat ? { ...heartbeat, ...requestedHeartbeat } : heartbeat;
|
||||
const isInterval = reason === "interval";
|
||||
const startedAt = Date.now();
|
||||
const now = startedAt;
|
||||
@@ -1428,7 +1432,7 @@ export function startHeartbeatRunner(opts: {
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
heartbeat: resolveRequestedHeartbeat(targetAgent.heartbeat),
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
@@ -1496,11 +1500,12 @@ export function startHeartbeatRunner(opts: {
|
||||
}
|
||||
};
|
||||
|
||||
const wakeHandler: HeartbeatWakeHandler = async (params) =>
|
||||
const wakeHandler: HeartbeatWakeHandler = async (params: HeartbeatWakeRequest) =>
|
||||
run({
|
||||
reason: params.reason,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
heartbeat: params.heartbeat,
|
||||
});
|
||||
const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler);
|
||||
updateConfig(state.cfg);
|
||||
|
||||
@@ -263,6 +263,7 @@ describe("heartbeat-wake", () => {
|
||||
reason: "cron:job-1",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: { target: "last" },
|
||||
coalesceMs: 0,
|
||||
});
|
||||
|
||||
@@ -272,6 +273,7 @@ describe("heartbeat-wake", () => {
|
||||
reason: "cron:job-1",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
@@ -280,6 +282,37 @@ describe("heartbeat-wake", () => {
|
||||
reason: "cron:job-1",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves heartbeat override when same-target wakes coalesce", async () => {
|
||||
vi.useFakeTimers();
|
||||
const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
|
||||
setHeartbeatWakeHandler(handler);
|
||||
|
||||
requestHeartbeatNow({
|
||||
reason: "manual",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: { target: "last" },
|
||||
coalesceMs: 100,
|
||||
});
|
||||
requestHeartbeatNow({
|
||||
reason: "manual",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
coalesceMs: 100,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
expect(handler).toHaveBeenCalledWith({
|
||||
reason: "manual",
|
||||
agentId: "ops",
|
||||
sessionKey: "agent:ops:discord:channel:alerts",
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -10,11 +10,14 @@ export type HeartbeatRunResult =
|
||||
| { status: "skipped"; reason: string }
|
||||
| { status: "failed"; reason: string };
|
||||
|
||||
export type HeartbeatWakeHandler = (opts: {
|
||||
export type HeartbeatWakeRequest = {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
}) => Promise<HeartbeatRunResult>;
|
||||
heartbeat?: { target?: string };
|
||||
};
|
||||
|
||||
export type HeartbeatWakeHandler = (opts: HeartbeatWakeRequest) => Promise<HeartbeatRunResult>;
|
||||
|
||||
let heartbeatsEnabled = true;
|
||||
|
||||
@@ -33,6 +36,7 @@ type PendingWakeReason = {
|
||||
requestedAt: number;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
};
|
||||
|
||||
let handler: HeartbeatWakeHandler | null = null;
|
||||
@@ -87,6 +91,7 @@ function queuePendingWakeReason(params?: {
|
||||
requestedAt?: number;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) {
|
||||
const requestedAt = params?.requestedAt ?? Date.now();
|
||||
const normalizedReason = normalizeWakeReason(params?.reason);
|
||||
@@ -102,18 +107,23 @@ function queuePendingWakeReason(params?: {
|
||||
requestedAt,
|
||||
agentId: normalizedAgentId,
|
||||
sessionKey: normalizedSessionKey,
|
||||
heartbeat: params?.heartbeat,
|
||||
};
|
||||
const previous = pendingWakes.get(wakeTargetKey);
|
||||
if (!previous) {
|
||||
pendingWakes.set(wakeTargetKey, next);
|
||||
return;
|
||||
}
|
||||
const merged =
|
||||
(next.heartbeat ?? previous.heartbeat)
|
||||
? { ...next, heartbeat: next.heartbeat ?? previous.heartbeat }
|
||||
: next;
|
||||
if (next.priority > previous.priority) {
|
||||
pendingWakes.set(wakeTargetKey, next);
|
||||
pendingWakes.set(wakeTargetKey, merged);
|
||||
return;
|
||||
}
|
||||
if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) {
|
||||
pendingWakes.set(wakeTargetKey, next);
|
||||
pendingWakes.set(wakeTargetKey, merged);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,6 +172,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
|
||||
reason: pendingWake.reason ?? undefined,
|
||||
...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}),
|
||||
...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}),
|
||||
...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}),
|
||||
};
|
||||
const res = await active(wakeOpts);
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
@@ -170,6 +181,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
|
||||
reason: pendingWake.reason ?? "retry",
|
||||
agentId: pendingWake.agentId,
|
||||
sessionKey: pendingWake.sessionKey,
|
||||
heartbeat: pendingWake.heartbeat,
|
||||
});
|
||||
schedule(DEFAULT_RETRY_MS, "retry");
|
||||
}
|
||||
@@ -181,6 +193,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
|
||||
reason: pendingWake.reason ?? "retry",
|
||||
agentId: pendingWake.agentId,
|
||||
sessionKey: pendingWake.sessionKey,
|
||||
heartbeat: pendingWake.heartbeat,
|
||||
});
|
||||
}
|
||||
schedule(DEFAULT_RETRY_MS, "retry");
|
||||
@@ -241,11 +254,13 @@ export function requestHeartbeatNow(opts?: {
|
||||
coalesceMs?: number;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) {
|
||||
queuePendingWakeReason({
|
||||
reason: opts?.reason,
|
||||
agentId: opts?.agentId,
|
||||
sessionKey: opts?.sessionKey,
|
||||
heartbeat: opts?.heartbeat,
|
||||
});
|
||||
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user