fix(heartbeat): defer during cron and nested lane pressure

This commit is contained in:
Peter Steinberger
2026-04-29 10:08:11 +01:00
parent 422d139ba0
commit f5e7557c70
22 changed files with 422 additions and 46 deletions

View File

@@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai
- Channels/Discord: split long CJK replies at punctuation and code-point-safe fallback boundaries so Discord chunking stays readable without corrupting astral characters. Fixes #38597; repairs #71384. Thanks @p3nchan.
- Browser/gateway: ignore Playwright dialog-close races from `Page.handleJavaScriptDialog` so browser automation no longer crashes the Gateway when a dialog disappears before Playwright accepts it. (#40067) Thanks @randyjtw.
- Cron/Gateway: defer missed isolated agent-turn catch-up out of the channel startup window, so overdue cron work cannot starve Discord or Telegram while providers connect after a restart. Thanks @vincentkoc.
- Heartbeat/cron: defer heartbeat turns while cron work is active or queued, add opt-in `heartbeat.skipWhenBusy` for subagent/nested lane pressure, and retry busy skips without advancing the schedule so local Ollama hosts do not run heartbeat and cron prompts concurrently. Fixes #50773. Thanks @scottgl9.
- Plugins/runtime-deps: prune stale `openclaw-unknown-*` bundled runtime dependency roots during Gateway startup while keeping recent or locked roots, so old staging debris cannot keep growing across restarts. Thanks @vincentkoc.
- Ollama: compose caller abort signals with guarded-fetch timeouts for native `/api/chat` streams, so `/stop` and early cancellation still interrupt local Ollama requests that also carry provider timeout budgets. Refs #74133. Thanks @obviyus.
- Doctor/TTS: migrate legacy `messages.tts.enabled`, agent TTS, channel TTS, and voice-call plugin TTS toggles to `auto` mode during `openclaw doctor --fix`, matching the documented TTS config contract. Thanks @vincentkoc.

View File

@@ -1,4 +1,4 @@
96edba9fd67fa057f6b6c43d54a168db25d0e27ddd4e91a7e2918c8657f0f212 config-baseline.json
510ed7af2e3731c8a307dbc10181328f82764a4e8dd9e9dddc6118db6f882ff7 config-baseline.core.json
592d25e08647ced4fae0c4fdbff95e50d1749c42d39070f6b6bc6a3e0475d4f0 config-baseline.json
9cd2c40b4a45976b74458f9ada8ecc31c532ee81f10145a9828bbff31777c03e config-baseline.core.json
9f5fad66a49fa618d64a963470aa69fed9fe4b4639cc4321f9ec04bfb2f8aa50 config-baseline.channel.json
0dd6583fafae6c9134e46c4cf9bddee9822d6436436dcb1a6dcba6d012962e51 config-baseline.plugin.json

View File

@@ -93,7 +93,7 @@ See [Hooks](/automation/hooks).
### Heartbeat
Heartbeat is a periodic main-session turn (default every 30 minutes). It batches multiple checks (inbox, calendar, notifications) in one agent turn with full session context. Heartbeat turns do not create task records and do not extend daily/idle session reset freshness. Use `HEARTBEAT.md` for a small checklist, or a `tasks:` block when you want due-only periodic checks inside heartbeat itself. Empty heartbeat files skip as `empty-heartbeat-file`; due-only task mode skips as `no-tasks-due`.
Heartbeat is a periodic main-session turn (default every 30 minutes). It batches multiple checks (inbox, calendar, notifications) in one agent turn with full session context. Heartbeat turns do not create task records and do not extend daily/idle session reset freshness. Use `HEARTBEAT.md` for a small checklist, or a `tasks:` block when you want due-only periodic checks inside heartbeat itself. Empty heartbeat files skip as `empty-heartbeat-file`; due-only task mode skips as `no-tasks-due`. Heartbeats defer while cron work is active or queued, and `heartbeat.skipWhenBusy` can also defer them while subagent or nested lanes are busy.
See [Heartbeat](/gateway/heartbeat).

View File

@@ -516,6 +516,7 @@ Periodic heartbeat runs.
includeSystemPromptSection: true, // default: true; false omits the Heartbeat section from the system prompt
lightContext: false, // default: false; true keeps only HEARTBEAT.md from workspace bootstrap files
isolatedSession: false, // default: false; true runs each heartbeat in a fresh session (no conversation history)
skipWhenBusy: false, // default: false; true also waits for subagent/nested lanes
session: "main",
to: "+15555550123",
directPolicy: "allow", // allow (default) | block
@@ -537,6 +538,7 @@ Periodic heartbeat runs.
- `directPolicy`: direct/DM delivery policy. `allow` (default) permits direct-target delivery. `block` suppresses direct-target delivery and emits `reason=dm-blocked`.
- `lightContext`: when true, heartbeat runs use lightweight bootstrap context and keep only `HEARTBEAT.md` from workspace bootstrap files.
- `isolatedSession`: when true, each heartbeat runs in a fresh session with no prior conversation history. Same isolation pattern as cron `sessionTarget: "isolated"`. Reduces per-heartbeat token cost from ~100K to ~2-5K tokens.
- `skipWhenBusy`: when true, heartbeat runs defer on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeats, even without this flag.
- Per-agent: set `agents.list[].heartbeat`. When any agent defines `heartbeat`, **only those agents** run heartbeats.
- Heartbeats run full agent turns — shorter intervals burn more tokens.

View File

@@ -50,6 +50,7 @@ Example config:
directPolicy: "allow", // default: allow direct/DM targets; set "block" to suppress
lightContext: true, // optional: only inject HEARTBEAT.md from bootstrap files
isolatedSession: true, // optional: fresh session each run (no conversation history)
skipWhenBusy: true, // optional: also defer when subagent or nested lanes are busy
// activeHours: { start: "08:00", end: "24:00" },
// includeReasoning: true, // optional: send separate `Reasoning:` message too
},
@@ -65,6 +66,7 @@ Example config:
- The heartbeat prompt is sent **verbatim** as the user message. The system prompt includes a "Heartbeat" section only when heartbeats are enabled for the default agent, and the run is flagged internally.
- When heartbeats are disabled with `0m`, normal runs also omit `HEARTBEAT.md` from bootstrap context so the model does not see heartbeat-only instructions.
- Active hours (`heartbeat.activeHours`) are checked in the configured timezone. Outside the window, heartbeats are skipped until the next tick inside the window.
- Heartbeats automatically defer while cron work is active or queued. Set `heartbeat.skipWhenBusy: true` to defer on extra busy lanes (subagent or nested command work) as well; this is useful for local Ollama and other constrained single-runtime hosts.
## What the heartbeat prompt is for
@@ -98,6 +100,7 @@ Outside heartbeats, stray `HEARTBEAT_OK` at the start/end of a message is stripp
includeReasoning: false, // default: false (deliver separate Reasoning: message when available)
lightContext: false, // default: false; true keeps only HEARTBEAT.md from workspace bootstrap files
isolatedSession: false, // default: false; true runs each heartbeat in a fresh session (no conversation history)
skipWhenBusy: false, // default: false; true also waits for subagent/nested lanes
target: "last", // default: none | options: last | none | <channel id> (core or plugin, e.g. "bluebubbles")
to: "+15551234567", // optional channel-specific override
accountId: "ops-bot", // optional multi-account channel id
@@ -230,6 +233,9 @@ Use `accountId` to target a specific account on multi-account channels like Tele
<ParamField path="isolatedSession" type="boolean" default="false">
When true, each heartbeat runs in a fresh session with no prior conversation history. Uses the same isolation pattern as cron `sessionTarget: "isolated"`. Dramatically reduces per-heartbeat token cost. Combine with `lightContext: true` for maximum savings. Delivery routing still uses the main session context.
</ParamField>
<ParamField path="skipWhenBusy" type="boolean" default="false">
When true, heartbeat runs defer on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeats, even without this flag, so local-model hosts do not run cron and heartbeat prompts at the same time.
</ParamField>
<ParamField path="session" type="string">
Optional session key for heartbeat runs.
@@ -287,7 +293,8 @@ Use `accountId` to target a specific account on multi-account channels like Tele
- `session` only affects the run context; delivery is controlled by `target` and `to`.
- To deliver to a specific channel/recipient, set `target` + `to`. With `target: "last"`, delivery uses the last external channel for that session.
- Heartbeat deliveries allow direct/DM targets by default. Set `directPolicy: "block"` to suppress direct-target sends while still running the heartbeat turn.
- If the main queue is busy, the heartbeat is skipped and retried later.
- If the main queue, target session lane, cron lane, or an active cron job is busy, the heartbeat is skipped and retried later.
- If `skipWhenBusy: true`, subagent and nested lanes also defer heartbeat runs.
- If `target` resolves to no external destination, the run still happens but no outbound message is sent.
</Accordion>

View File

@@ -437,7 +437,7 @@ Look for:
- Cron enabled and next wake present.
- Job run history status (`ok`, `skipped`, `error`).
- Heartbeat skip reasons (`quiet-hours`, `requests-in-flight`, `alerts-disabled`, `empty-heartbeat-file`, `no-tasks-due`).
- Heartbeat skip reasons (`quiet-hours`, `requests-in-flight`, `cron-in-progress`, `lanes-busy`, `alerts-disabled`, `empty-heartbeat-file`, `no-tasks-due`).
<AccordionGroup>
<Accordion title="Common signatures">

View File

@@ -5464,6 +5464,12 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
isolatedSession: {
type: "boolean",
},
skipWhenBusy: {
type: "boolean",
title: "Heartbeat Skip When Busy",
description:
"When true, defer heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
},
},
additionalProperties: false,
},
@@ -7198,6 +7204,12 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
isolatedSession: {
type: "boolean",
},
skipWhenBusy: {
type: "boolean",
title: "Heartbeat Skip When Busy",
description:
"Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
},
},
additionalProperties: false,
},
@@ -27261,6 +27273,16 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
label: "Heartbeat Timeout (Seconds)",
tags: ["performance", "automation"],
},
"agents.defaults.heartbeat.skipWhenBusy": {
label: "Heartbeat Skip When Busy",
help: "When true, defer heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
tags: ["automation"],
},
"agents.list.*.heartbeat.skipWhenBusy": {
label: "Heartbeat Skip When Busy",
help: "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
tags: ["automation"],
},
"agents.defaults.sandbox.browser.network": {
label: "Sandbox Browser Network",
help: "Docker network for sandbox browser containers (default: openclaw-sandbox-browser). Avoid bridge if you need stricter isolation.",
@@ -28419,6 +28441,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
help: "Per-agent maximum time in seconds allowed for a heartbeat agent turn before it is aborted. Leave unset to inherit the merged heartbeat/default agent timeout.",
tags: ["performance", "automation"],
},
"agents.list[].heartbeat.skipWhenBusy": {
label: "Agent Heartbeat Skip When Busy",
help: "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
tags: ["automation"],
},
"agents.list[].sandbox.browser.network": {
label: "Agent Sandbox Browser Network",
help: "Per-agent override for sandbox browser Docker network.",

View File

@@ -256,6 +256,10 @@ export const FIELD_HELP: Record<string, string> = {
"Maximum time in seconds allowed for a heartbeat agent turn before it is aborted. Leave unset to use agents.defaults.timeoutSeconds.",
"agents.list[].heartbeat.timeoutSeconds":
"Per-agent maximum time in seconds allowed for a heartbeat agent turn before it is aborted. Leave unset to inherit the merged heartbeat/default agent timeout.",
"agents.defaults.heartbeat.skipWhenBusy":
"When true, defer heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
"agents.list[].heartbeat.skipWhenBusy":
"Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
browser:
"Browser runtime controls for local or remote CDP attachment, profile routing, and screenshot/snapshot behavior. Keep defaults unless your automation workflow requires custom browser transport settings.",
"browser.enabled":
@@ -1684,6 +1688,8 @@ export const FIELD_HELP: Record<string, string> = {
'Controls whether heartbeat delivery may target direct/DM chats: "allow" (default) permits DM delivery and "block" suppresses direct-target sends.',
"agents.list.*.heartbeat.directPolicy":
'Per-agent override for heartbeat direct/DM delivery policy; use "block" for agents that should only send heartbeat alerts to non-DM destinations.',
"agents.list.*.heartbeat.skipWhenBusy":
"Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.",
"channels.mattermost.configWrites":
"Allow Mattermost to write config in response to channel events/commands (default: true).",
"channels.modelByChannel":

View File

@@ -634,6 +634,8 @@ export const FIELD_LABELS: Record<string, string> = {
"agents.list.*.heartbeat.suppressToolErrorWarnings": "Heartbeat Suppress Tool Error Warnings",
"agents.defaults.heartbeat.timeoutSeconds": "Heartbeat Timeout (Seconds)",
"agents.list.*.heartbeat.timeoutSeconds": "Heartbeat Timeout (Seconds)",
"agents.defaults.heartbeat.skipWhenBusy": "Heartbeat Skip When Busy",
"agents.list.*.heartbeat.skipWhenBusy": "Heartbeat Skip When Busy",
"agents.defaults.sandbox.browser.network": "Sandbox Browser Network",
"agents.defaults.sandbox.browser.cdpSourceRange": "Sandbox Browser CDP Source Port Range",
"agents.defaults.sandbox.docker.dangerouslyAllowContainerNamespaceJoin":
@@ -883,6 +885,7 @@ export const FIELD_LABELS: Record<string, string> = {
"agents.list[].heartbeat.suppressToolErrorWarnings":
"Agent Heartbeat Suppress Tool Error Warnings",
"agents.list[].heartbeat.timeoutSeconds": "Agent Heartbeat Timeout (Seconds)",
"agents.list[].heartbeat.skipWhenBusy": "Agent Heartbeat Skip When Busy",
"agents.list[].sandbox.browser.network": "Agent Sandbox Browser Network",
"agents.list[].sandbox.browser.cdpSourceRange": "Agent Sandbox Browser CDP Source Port Range",
"agents.list[].sandbox.docker.dangerouslyAllowContainerNamespaceJoin":

View File

@@ -378,6 +378,11 @@ export type AgentDefaultsConfig = {
* per-heartbeat token cost by avoiding the full session transcript.
*/
isolatedSession?: boolean;
/**
* If true, defer heartbeat runs while subagent or nested command lanes are busy.
* Cron lanes are always treated as busy for heartbeat deferral.
*/
skipWhenBusy?: boolean;
/**
* When enabled, deliver the model's reasoning payload for heartbeat runs (when available)
* as a separate message prefixed with `Reasoning:` (same as `/reasoning on`).

View File

@@ -131,15 +131,17 @@ describe("agent defaults schema", () => {
it("accepts positive heartbeat timeoutSeconds on defaults and agent entries", () => {
const defaults = AgentDefaultsSchema.parse({
heartbeat: { timeoutSeconds: 45 },
heartbeat: { timeoutSeconds: 45, skipWhenBusy: true },
})!;
const agent = AgentEntrySchema.parse({
id: "ops",
heartbeat: { timeoutSeconds: 45 },
heartbeat: { timeoutSeconds: 45, skipWhenBusy: true },
});
expect(defaults.heartbeat?.timeoutSeconds).toBe(45);
expect(defaults.heartbeat?.skipWhenBusy).toBe(true);
expect(agent.heartbeat?.timeoutSeconds).toBe(45);
expect(agent.heartbeat?.skipWhenBusy).toBe(true);
});
it("accepts per-agent TTS overrides", () => {

View File

@@ -42,6 +42,7 @@ export const HeartbeatSchema = z
timeoutSeconds: z.number().int().positive().optional(),
lightContext: z.boolean().optional(),
isolatedSession: z.boolean().optional(),
skipWhenBusy: z.boolean().optional(),
})
.strict()
.superRefine((val, ctx) => {

View File

@@ -33,6 +33,10 @@ export function isCronJobActive(jobId: string) {
return getCronActiveJobState().activeJobIds.has(jobId);
}
export function hasActiveCronJobs() {
return getCronActiveJobState().activeJobIds.size > 0;
}
export function resetCronActiveJobsForTests() {
getCronActiveJobState().activeJobIds.clear();
}

View File

@@ -1,6 +1,10 @@
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
type HeartbeatRunResult,
} from "../infra/heartbeat-wake.js";
import type { CronEvent, CronServiceDeps } from "./service.js";
import { CronService } from "./service.js";
import { createDeferred, createNoopLogger, installCronTestHooks } from "./service.test-harness.js";
@@ -524,7 +528,7 @@ describe("CronService", () => {
it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => {
const runHeartbeatOnce = vi.fn(async () => ({
status: "skipped" as const,
reason: "requests-in-flight",
reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
}));
let now = 0;
const nowMs = () => {
@@ -562,6 +566,38 @@ describe("CronService", () => {
await stopCronAndCleanup(cron, store);
});
it("wakeMode now queues heartbeat when cron active marker blocks synchronous wake", async () => {
const runHeartbeatOnce = vi.fn(async () => ({
status: "skipped" as const,
reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS,
}));
const { store, cron, requestHeartbeatNow } = await createWakeModeNowMainHarness({
runHeartbeatOnce,
});
const sessionKey = "agent:main:discord:channel:ops";
const job = await addWakeModeNowMainSystemEventJob(cron, {
name: "wakeMode now cron marker fallback",
sessionKey,
});
await cron.run(job.id, "force");
expect(runHeartbeatOnce).toHaveBeenCalledTimes(1);
expect(requestHeartbeatNow).toHaveBeenCalledWith(
expect.objectContaining({
reason: `cron:${job.id}`,
sessionKey,
}),
);
expect(job.state.lastStatus).toBe("ok");
expect(job.state.lastError).toBeUndefined();
await cron.list({ includeDisabled: true });
await stopCronAndCleanup(cron, store);
});
it("runs an isolated job without posting a fallback summary to main", async () => {
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =

View File

@@ -1,6 +1,10 @@
import { resolveFailoverReasonFromError } from "../../agents/failover-error.js";
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
isRetryableHeartbeatBusySkipReason,
} from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js";
import {
@@ -1350,13 +1354,17 @@ async function executeMainSessionCronJob(
sessionKey: targetMainSessionKey,
heartbeat: { target: "last" },
});
if (heartbeatResult.status !== "skipped" || heartbeatResult.reason !== "requests-in-flight") {
if (
heartbeatResult.status !== "skipped" ||
!isRetryableHeartbeatBusySkipReason(heartbeatResult.reason)
) {
break;
}
if (isRecurringJob) {
if (isRecurringJob || heartbeatResult.reason === HEARTBEAT_SKIP_CRON_IN_PROGRESS) {
// Recurring main-session cron jobs should not hold the cron lane open
// while the main lane is busy, or their measured duration starts to
// reflect queue wait instead of cron bookkeeping (#58833).
// while runtime lanes are busy. A cron-in-progress skip is caused by
// this job's own active marker, so direct wake-now cannot succeed until
// the cron job returns and clears it (#50773).
state.deps.requestHeartbeatNow({
reason,
agentId: job.agentId,

View File

@@ -2,7 +2,13 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { startHeartbeatRunner } from "./heartbeat-runner.js";
import { computeNextHeartbeatPhaseDueMs, resolveHeartbeatPhaseMs } from "./heartbeat-schedule.js";
import { requestHeartbeatNow, resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
type RetryableHeartbeatBusySkipReason,
requestHeartbeatNow,
resetHeartbeatWakeStateForTests,
} from "./heartbeat-wake.js";
describe("startHeartbeatRunner", () => {
type RunOnce = Parameters<typeof startHeartbeatRunner>[0]["runOnce"];
@@ -44,12 +50,12 @@ describe("startHeartbeatRunner", () => {
});
}
function createRequestsInFlightRunSpy(skipCount: number) {
function createRetryableBusyRunSpy(reason: RetryableHeartbeatBusySkipReason, skipCount: number) {
let callCount = 0;
return vi.fn().mockImplementation(async () => {
callCount++;
if (callCount <= skipCount) {
return { status: "skipped", reason: "requests-in-flight" } as const;
return { status: "skipped", reason } as const;
}
return { status: "ran", durationMs: 1 } as const;
});
@@ -214,7 +220,7 @@ describe("startHeartbeatRunner", () => {
it("reschedules timer when runOnce returns requests-in-flight", async () => {
useFakeHeartbeatTime();
const runSpy = createRequestsInFlightRunSpy(1);
const runSpy = createRetryableBusyRunSpy(HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, 1);
const runner = startHeartbeatRunner({
cfg: heartbeatConfig(),
@@ -235,6 +241,27 @@ describe("startHeartbeatRunner", () => {
runner.stop();
});
it("reschedules timer when runOnce returns cron-in-progress", async () => {
useFakeHeartbeatTime();
const runSpy = createRetryableBusyRunSpy(HEARTBEAT_SKIP_CRON_IN_PROGRESS, 1);
const runner = startHeartbeatRunner({
cfg: heartbeatConfig(),
runOnce: runSpy,
stableSchedulerSeed: TEST_SCHEDULER_SEED,
});
const firstDueMs = resolveDueFromNow(0, 30 * 60_000, "main");
await vi.advanceTimersByTimeAsync(firstDueMs + 1);
expect(runSpy).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1_000);
expect(runSpy).toHaveBeenCalledTimes(2);
runner.stop();
});
it("does not push nextDueMs forward on repeated requests-in-flight skips", async () => {
useFakeHeartbeatTime();
@@ -246,7 +273,7 @@ describe("startHeartbeatRunner", () => {
callTimes.push(Date.now());
callCount++;
if (callCount <= 5) {
return { status: "skipped", reason: "requests-in-flight" } as const;
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT } as const;
}
return { status: "ran", durationMs: 1 } as const;
});

View File

@@ -1,9 +1,18 @@
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { resolveNestedAgentLaneForSession } from "../agents/lanes.js";
import type { OpenClawConfig } from "../config/config.js";
import { markCronJobActive, resetCronActiveJobsForTests } from "../cron/active-jobs.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
import type { CommandLaneSnapshot } from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
import { type HeartbeatDeps, runHeartbeatOnce } from "./heartbeat-runner.js";
import { seedMainSessionStore, withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_LANES_BUSY,
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
} from "./heartbeat-wake.js";
import { resetSystemEventsForTest, enqueueSystemEvent } from "./system-events.js";
vi.mock("jiti", () => ({ createJiti: () => () => ({}) }));
@@ -33,6 +42,7 @@ afterAll(() => {
beforeEach(() => {
resetSystemEventsForTest();
resetCronActiveJobsForTests();
});
function createHeartbeatTelegramConfig(): OpenClawConfig {
@@ -61,7 +71,99 @@ async function seedHeartbeatTelegramSession(storePath: string, cfg: OpenClawConf
});
}
function createBusyLaneSnapshot(lane: string): CommandLaneSnapshot {
return {
lane,
activeCount: 1,
queuedCount: 0,
maxConcurrent: 1,
draining: false,
generation: 0,
};
}
describe("heartbeat runner skips when target session lane is busy", () => {
it("returns cron-in-progress when cron has an active job", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
markCronJobActive("local-model-report");
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS });
expect(replySpy).not.toHaveBeenCalled();
});
});
it("returns cron-in-progress when cron lanes have queued work", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
await seedHeartbeatTelegramSession(storePath, cfg);
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((lane?: string) => (lane === CommandLane.Cron ? 1 : 0)),
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS });
expect(replySpy).not.toHaveBeenCalled();
});
});
it("returns lanes-busy for opt-in broader busy-lane checks", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
cfg.agents!.defaults!.heartbeat = { every: "30m", skipWhenBusy: true };
await seedHeartbeatTelegramSession(storePath, cfg);
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((lane?: string) => (lane === CommandLane.Subagent ? 1 : 0)),
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY });
expect(replySpy).not.toHaveBeenCalled();
});
});
it("returns lanes-busy for opt-in work in any session-scoped nested lane", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
cfg.agents!.defaults!.heartbeat = { every: "30m", skipWhenBusy: true };
await seedHeartbeatTelegramSession(storePath, cfg);
const nestedSessionLane = resolveNestedAgentLaneForSession("agent:other:telegram:123");
const result = await runHeartbeatOnce({
cfg,
deps: {
getQueueSize: vi.fn((_lane?: string) => 0),
getCommandLaneSnapshots: vi.fn(() => [createBusyLaneSnapshot(nestedSessionLane)]),
nowMs: () => Date.now(),
getReplyFromConfig: replySpy,
} as HeartbeatDeps,
});
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY });
expect(replySpy).not.toHaveBeenCalled();
});
});
it("returns requests-in-flight when session lane has queued work", async () => {
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
const cfg = createHeartbeatTelegramConfig();
@@ -93,7 +195,7 @@ describe("heartbeat runner skips when target session lane is busy", () => {
expect(result.status).toBe("skipped");
if (result.status === "skipped") {
expect(result.reason).toBe("requests-in-flight");
expect(result.reason).toBe(HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT);
}
expect(replySpy).not.toHaveBeenCalled();
});

View File

@@ -12,6 +12,7 @@ import {
} from "../agents/agent-scope.js";
import { appendCronStyleCurrentTimeLine } from "../agents/current-time.js";
import { resolveEffectiveMessagesConfig } from "../agents/identity.js";
import { isNestedAgentLane } from "../agents/lanes.js";
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js";
import { DEFAULT_HEARTBEAT_FILENAME } from "../agents/workspace.js";
import { resolveHeartbeatReplyPayload } from "../auto-reply/heartbeat-reply-payload.js";
@@ -46,10 +47,15 @@ import {
} from "../config/sessions/store.js";
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { hasActiveCronJobs } from "../cron/active-jobs.js";
import { resolveCronSession } from "../cron/isolated-agent/session.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { getActivePluginChannelRegistry } from "../plugins/runtime.js";
import { getQueueSize } from "../process/command-queue.js";
import {
getCommandLaneSnapshots,
getQueueSize,
type CommandLaneSnapshot,
} from "../process/command-queue.js";
import { CommandLane } from "../process/lanes.js";
import {
isSubagentSessionKey,
@@ -91,9 +97,13 @@ import { createHeartbeatTypingCallbacks } from "./heartbeat-typing.js";
import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js";
import {
areHeartbeatsEnabled,
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_LANES_BUSY,
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
type HeartbeatRunResult,
type HeartbeatWakeHandler,
type HeartbeatWakeRequest,
isRetryableHeartbeatBusySkipReason,
requestHeartbeatNow,
setHeartbeatsEnabled,
setHeartbeatWakeHandler,
@@ -116,6 +126,7 @@ export type HeartbeatDeps = OutboundSendDeps &
getReplyFromConfig?: typeof import("./heartbeat-runner.runtime.js").getReplyFromConfig;
runtime?: RuntimeEnv;
getQueueSize?: (lane?: string) => number;
getCommandLaneSnapshots?: () => readonly CommandLaneSnapshot[];
nowMs?: () => number;
};
@@ -128,6 +139,35 @@ function loadHeartbeatRunnerRuntime() {
return heartbeatRunnerRuntimePromise;
}
const HEARTBEAT_ALWAYS_BUSY_LANES = [CommandLane.Cron, CommandLane.CronNested] as const;
const HEARTBEAT_OPT_IN_BUSY_LANES = [CommandLane.Subagent, CommandLane.Nested] as const;
function hasQueuedWorkInLanes(
lanes: readonly string[],
getSize: (lane?: string) => number,
): boolean {
return lanes.some((lane) => getSize(lane) > 0);
}
function hasQueuedWorkInLaneSnapshots(
snapshots: readonly CommandLaneSnapshot[],
matchesLane: (lane: string) => boolean,
): boolean {
return snapshots.some(
(snapshot) => matchesLane(snapshot.lane) && snapshot.activeCount + snapshot.queuedCount > 0,
);
}
function hasOptInBusyLaneWork(
getSize: (lane?: string) => number,
getSnapshots: () => readonly CommandLaneSnapshot[],
): boolean {
return (
hasQueuedWorkInLanes(HEARTBEAT_OPT_IN_BUSY_LANES, getSize) ||
hasQueuedWorkInLaneSnapshots(getSnapshots(), isNestedAgentLane)
);
}
function resolveHeartbeatChannelPlugin(channel: string): ChannelPlugin | undefined {
const activePlugin = getActivePluginChannelRegistry()?.channels.find(
(entry) => entry.plugin.id === channel,
@@ -755,9 +795,28 @@ export async function runHeartbeatOnce(opts: {
return { status: "skipped", reason: "quiet-hours" };
}
const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main);
if (queueSize > 0) {
return { status: "skipped", reason: "requests-in-flight" };
const getSize = opts.deps?.getQueueSize ?? getQueueSize;
const getSnapshots = opts.deps?.getCommandLaneSnapshots ?? getCommandLaneSnapshots;
if (getSize(CommandLane.Main) > 0) {
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
}
if (hasActiveCronJobs() || hasQueuedWorkInLanes(HEARTBEAT_ALWAYS_BUSY_LANES, getSize)) {
emitHeartbeatEvent({
status: "skipped",
reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS,
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS };
}
if (heartbeat?.skipWhenBusy === true && hasOptInBusyLaneWork(getSize, getSnapshots)) {
emitHeartbeatEvent({
status: "skipped",
reason: HEARTBEAT_SKIP_LANES_BUSY,
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY };
}
// Preflight centralizes trigger classification, event inspection, and HEARTBEAT.md gating.
@@ -782,14 +841,13 @@ export async function runHeartbeatOnce(opts: {
// an active streaming turn. The wake-layer retry (heartbeat-wake.ts) will
// re-schedule this wake automatically. See #14396 (closed without merge).
const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey);
const sessionLaneSize = (opts.deps?.getQueueSize ?? getQueueSize)(sessionLaneKey);
if (sessionLaneSize > 0) {
if (getSize(sessionLaneKey) > 0) {
emitHeartbeatEvent({
status: "skipped",
reason: "requests-in-flight",
reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
durationMs: Date.now() - startedAt,
});
return { status: "skipped", reason: "requests-in-flight" };
return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT };
}
const previousUpdatedAt = entry?.updatedAt;
@@ -1484,9 +1542,9 @@ export function startHeartbeatRunner(opts: {
const startedAt = Date.now();
const now = startedAt;
let ran = false;
// Track requests-in-flight so we can skip re-arm in finally — the wake
// Track retryable busy skips so we can skip re-arm in finally — the wake
// layer handles retry for this case (DEFAULT_RETRY_MS = 1 s).
let requestsInFlight = false;
let retryableBusySkip = false;
try {
if (requestedSessionKey || requestedAgentId) {
@@ -1504,6 +1562,10 @@ export function startHeartbeatRunner(opts: {
sessionKey: requestedSessionKey,
deps: { runtime: state.runtime },
});
if (res.status === "skipped" && isRetryableHeartbeatBusySkipReason(res.reason)) {
retryableBusySkip = true;
return res;
}
if (res.status !== "skipped" || res.reason !== "disabled") {
advanceAgentSchedule(targetAgent, now, reason);
}
@@ -1538,12 +1600,12 @@ export function startHeartbeatRunner(opts: {
advanceAgentSchedule(agent, now, reason);
continue;
}
if (res.status === "skipped" && res.reason === "requests-in-flight") {
if (res.status === "skipped" && isRetryableHeartbeatBusySkipReason(res.reason)) {
// Do not advance the schedule — the main lane is busy and the wake
// layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling
// scheduleNext() here would register a 0 ms timer that races with
// the wake layer's 1 s retry and wins, bypassing the cooldown.
requestsInFlight = true;
retryableBusySkip = true;
return res;
}
if (res.status !== "skipped" || res.reason !== "disabled") {
@@ -1559,9 +1621,9 @@ export function startHeartbeatRunner(opts: {
}
return { status: "skipped", reason: isInterval ? "not-due" : "disabled" };
} finally {
// Always re-arm the timer — except for requests-in-flight, where the
// Always re-arm the timer — except for retryable busy skips, where the
// wake layer (heartbeat-wake.ts) handles retry via schedule(DEFAULT_RETRY_MS).
if (!requestsInFlight) {
if (!retryableBusySkip) {
scheduleNext();
}
}

View File

@@ -1,5 +1,8 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_LANES_BUSY,
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
hasHeartbeatWakeHandler,
hasPendingHeartbeatWake,
requestHeartbeatNow,
@@ -11,7 +14,7 @@ describe("heartbeat-wake", () => {
function setRetryOnceHeartbeatHandler() {
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" })
.mockResolvedValueOnce({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT })
.mockResolvedValueOnce({ status: "ran", durationMs: 1 });
setHeartbeatWakeHandler(handler);
return handler;
@@ -72,7 +75,7 @@ describe("heartbeat-wake", () => {
vi.useFakeTimers();
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" })
.mockResolvedValueOnce({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT })
.mockResolvedValueOnce({ status: "ran", durationMs: 1 });
await expectRetryAfterDefaultDelay({
handler,
@@ -81,6 +84,22 @@ describe("heartbeat-wake", () => {
});
});
it.each([HEARTBEAT_SKIP_CRON_IN_PROGRESS, HEARTBEAT_SKIP_LANES_BUSY])(
"retries %s after the default retry delay",
async (reason) => {
vi.useFakeTimers();
const handler = vi
.fn()
.mockResolvedValueOnce({ status: "skipped", reason })
.mockResolvedValueOnce({ status: "ran", durationMs: 1 });
await expectRetryAfterDefaultDelay({
handler,
initialReason: "interval",
expectedRetryReason: "interval",
});
},
);
it("keeps retry cooldown even when a sooner request arrives", async () => {
vi.useFakeTimers();
const handler = setRetryOnceHeartbeatHandler();
@@ -219,7 +238,9 @@ describe("heartbeat-wake", () => {
it("clears stale retry cooldown when a new handler is registered", async () => {
vi.useFakeTimers();
const handlerA = vi.fn().mockResolvedValue({ status: "skipped", reason: "requests-in-flight" });
const handlerA = vi
.fn()
.mockResolvedValue({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT });
setHeartbeatWakeHandler(handlerA);
requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });

View File

@@ -10,6 +10,24 @@ export type HeartbeatRunResult =
| { status: "skipped"; reason: string }
| { status: "failed"; reason: string };
export const HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT = "requests-in-flight";
export const HEARTBEAT_SKIP_CRON_IN_PROGRESS = "cron-in-progress";
export const HEARTBEAT_SKIP_LANES_BUSY = "lanes-busy";
export type RetryableHeartbeatBusySkipReason =
| typeof HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT
| typeof HEARTBEAT_SKIP_CRON_IN_PROGRESS
| typeof HEARTBEAT_SKIP_LANES_BUSY;
const RETRYABLE_BUSY_SKIP_REASONS = new Set([
HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT,
HEARTBEAT_SKIP_CRON_IN_PROGRESS,
HEARTBEAT_SKIP_LANES_BUSY,
]);
export function isRetryableHeartbeatBusySkipReason(reason: string): boolean {
return RETRYABLE_BUSY_SKIP_REASONS.has(reason);
}
export type HeartbeatWakeRequest = {
reason?: string;
agentId?: string;
@@ -175,8 +193,8 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}),
};
const res = await active(wakeOpts);
if (res.status === "skipped" && res.reason === "requests-in-flight") {
// The main lane is busy; retry this wake target soon.
if (res.status === "skipped" && isRetryableHeartbeatBusySkipReason(res.reason)) {
// The target runtime is busy; retry this wake target soon.
queuePendingWakeReason({
reason: pendingWake.reason ?? "retry",
agentId: pendingWake.agentId,

View File

@@ -27,6 +27,7 @@ let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"];
let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"];
let getActiveTaskCount: CommandQueueModule["getActiveTaskCount"];
let getCommandLaneSnapshot: CommandQueueModule["getCommandLaneSnapshot"];
let getCommandLaneSnapshots: CommandQueueModule["getCommandLaneSnapshots"];
let getQueueSize: CommandQueueModule["getQueueSize"];
let markGatewayDraining: CommandQueueModule["markGatewayDraining"];
let resetAllLanes: CommandQueueModule["resetAllLanes"];
@@ -67,6 +68,7 @@ describe("command queue", () => {
GatewayDrainingError,
getActiveTaskCount,
getCommandLaneSnapshot,
getCommandLaneSnapshots,
getQueueSize,
markGatewayDraining,
resetAllLanes,
@@ -349,6 +351,38 @@ describe("command queue", () => {
await expect(second).resolves.toBe("second");
});
it("getCommandLaneSnapshots reports all live lanes in stable order", async () => {
const alphaLane = `snapshot-all-alpha-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const betaLane = `snapshot-all-beta-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(alphaLane, 1);
setCommandLaneConcurrency(betaLane, 1);
const alphaBlocker = createDeferred();
const betaBlocker = createDeferred();
const alpha = enqueueCommandInLane(alphaLane, async () => {
await alphaBlocker.promise;
return "alpha";
});
const beta = enqueueCommandInLane(betaLane, async () => {
await betaBlocker.promise;
return "beta";
});
const snapshots = getCommandLaneSnapshots().filter(
(snapshot) => snapshot.lane === alphaLane || snapshot.lane === betaLane,
);
expect(snapshots.map((snapshot) => snapshot.lane)).toEqual([alphaLane, betaLane]);
expect(snapshots).toEqual([
expect.objectContaining({ lane: alphaLane, activeCount: 1, queuedCount: 0 }),
expect.objectContaining({ lane: betaLane, activeCount: 1, queuedCount: 0 }),
]);
alphaBlocker.resolve();
betaBlocker.resolve();
await expect(alpha).resolves.toBe("alpha");
await expect(beta).resolves.toBe("beta");
});
it("waitForActiveTasks ignores tasks that start after the call", async () => {
const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 2);

View File

@@ -103,6 +103,17 @@ function getLaneDepth(state: LaneState): number {
return state.queue.length + state.activeTaskIds.size;
}
function createCommandLaneSnapshot(state: LaneState): CommandLaneSnapshot {
return {
lane: state.lane,
queuedCount: state.queue.length,
activeCount: state.activeTaskIds.size,
maxConcurrent: state.maxConcurrent,
draining: state.draining,
generation: state.generation,
};
}
function getLaneState(lane: string): LaneState {
const queueState = getQueueState();
const existing = queueState.lanes.get(lane);
@@ -309,14 +320,13 @@ export function getCommandLaneSnapshot(lane: string = CommandLane.Main): Command
generation: 0,
};
}
return {
lane: resolved,
queuedCount: state.queue.length,
activeCount: state.activeTaskIds.size,
maxConcurrent: state.maxConcurrent,
draining: state.draining,
generation: state.generation,
};
return createCommandLaneSnapshot(state);
}
export function getCommandLaneSnapshots(): CommandLaneSnapshot[] {
return Array.from(getQueueState().lanes.values(), createCommandLaneSnapshot).toSorted((a, b) =>
a.lane.localeCompare(b.lane),
);
}
export function getTotalQueueSize() {