diff --git a/CHANGELOG.md b/CHANGELOG.md index 65db02a80f8..dc8ce4935de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai - Channels/Discord: split long CJK replies at punctuation and code-point-safe fallback boundaries so Discord chunking stays readable without corrupting astral characters. Fixes #38597; repairs #71384. Thanks @p3nchan. - Browser/gateway: ignore Playwright dialog-close races from `Page.handleJavaScriptDialog` so browser automation no longer crashes the Gateway when a dialog disappears before Playwright accepts it. (#40067) Thanks @randyjtw. - Cron/Gateway: defer missed isolated agent-turn catch-up out of the channel startup window, so overdue cron work cannot starve Discord or Telegram while providers connect after a restart. Thanks @vincentkoc. +- Heartbeat/cron: defer heartbeat turns while cron work is active or queued, add opt-in `heartbeat.skipWhenBusy` for subagent/nested lane pressure, and retry busy skips without advancing the schedule so local Ollama hosts do not run heartbeat and cron prompts concurrently. Fixes #50773. Thanks @scottgl9. - Plugins/runtime-deps: prune stale `openclaw-unknown-*` bundled runtime dependency roots during Gateway startup while keeping recent or locked roots, so old staging debris cannot keep growing across restarts. Thanks @vincentkoc. - Ollama: compose caller abort signals with guarded-fetch timeouts for native `/api/chat` streams, so `/stop` and early cancellation still interrupt local Ollama requests that also carry provider timeout budgets. Refs #74133. Thanks @obviyus. - Doctor/TTS: migrate legacy `messages.tts.enabled`, agent TTS, channel TTS, and voice-call plugin TTS toggles to `auto` mode during `openclaw doctor --fix`, matching the documented TTS config contract. Thanks @vincentkoc. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index f1060e0fce8..fdab1e35af2 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -96edba9fd67fa057f6b6c43d54a168db25d0e27ddd4e91a7e2918c8657f0f212 config-baseline.json -510ed7af2e3731c8a307dbc10181328f82764a4e8dd9e9dddc6118db6f882ff7 config-baseline.core.json +592d25e08647ced4fae0c4fdbff95e50d1749c42d39070f6b6bc6a3e0475d4f0 config-baseline.json +9cd2c40b4a45976b74458f9ada8ecc31c532ee81f10145a9828bbff31777c03e config-baseline.core.json 9f5fad66a49fa618d64a963470aa69fed9fe4b4639cc4321f9ec04bfb2f8aa50 config-baseline.channel.json 0dd6583fafae6c9134e46c4cf9bddee9822d6436436dcb1a6dcba6d012962e51 config-baseline.plugin.json diff --git a/docs/automation/index.md b/docs/automation/index.md index cd8b49a330a..7c4c6306754 100644 --- a/docs/automation/index.md +++ b/docs/automation/index.md @@ -93,7 +93,7 @@ See [Hooks](/automation/hooks). ### Heartbeat -Heartbeat is a periodic main-session turn (default every 30 minutes). It batches multiple checks (inbox, calendar, notifications) in one agent turn with full session context. Heartbeat turns do not create task records and do not extend daily/idle session reset freshness. Use `HEARTBEAT.md` for a small checklist, or a `tasks:` block when you want due-only periodic checks inside heartbeat itself. Empty heartbeat files skip as `empty-heartbeat-file`; due-only task mode skips as `no-tasks-due`. +Heartbeat is a periodic main-session turn (default every 30 minutes). It batches multiple checks (inbox, calendar, notifications) in one agent turn with full session context. Heartbeat turns do not create task records and do not extend daily/idle session reset freshness. Use `HEARTBEAT.md` for a small checklist, or a `tasks:` block when you want due-only periodic checks inside heartbeat itself. Empty heartbeat files skip as `empty-heartbeat-file`; due-only task mode skips as `no-tasks-due`. Heartbeats defer while cron work is active or queued, and `heartbeat.skipWhenBusy` can also defer them while subagent or nested lanes are busy. See [Heartbeat](/gateway/heartbeat). diff --git a/docs/gateway/config-agents.md b/docs/gateway/config-agents.md index b9c2eaf6523..fe310945c0c 100644 --- a/docs/gateway/config-agents.md +++ b/docs/gateway/config-agents.md @@ -516,6 +516,7 @@ Periodic heartbeat runs. includeSystemPromptSection: true, // default: true; false omits the Heartbeat section from the system prompt lightContext: false, // default: false; true keeps only HEARTBEAT.md from workspace bootstrap files isolatedSession: false, // default: false; true runs each heartbeat in a fresh session (no conversation history) + skipWhenBusy: false, // default: false; true also waits for subagent/nested lanes session: "main", to: "+15555550123", directPolicy: "allow", // allow (default) | block @@ -537,6 +538,7 @@ Periodic heartbeat runs. - `directPolicy`: direct/DM delivery policy. `allow` (default) permits direct-target delivery. `block` suppresses direct-target delivery and emits `reason=dm-blocked`. - `lightContext`: when true, heartbeat runs use lightweight bootstrap context and keep only `HEARTBEAT.md` from workspace bootstrap files. - `isolatedSession`: when true, each heartbeat runs in a fresh session with no prior conversation history. Same isolation pattern as cron `sessionTarget: "isolated"`. Reduces per-heartbeat token cost from ~100K to ~2-5K tokens. +- `skipWhenBusy`: when true, heartbeat runs defer on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeats, even without this flag. - Per-agent: set `agents.list[].heartbeat`. When any agent defines `heartbeat`, **only those agents** run heartbeats. - Heartbeats run full agent turns — shorter intervals burn more tokens. diff --git a/docs/gateway/heartbeat.md b/docs/gateway/heartbeat.md index f17e225349d..b7533e2de57 100644 --- a/docs/gateway/heartbeat.md +++ b/docs/gateway/heartbeat.md @@ -50,6 +50,7 @@ Example config: directPolicy: "allow", // default: allow direct/DM targets; set "block" to suppress lightContext: true, // optional: only inject HEARTBEAT.md from bootstrap files isolatedSession: true, // optional: fresh session each run (no conversation history) + skipWhenBusy: true, // optional: also defer when subagent or nested lanes are busy // activeHours: { start: "08:00", end: "24:00" }, // includeReasoning: true, // optional: send separate `Reasoning:` message too }, @@ -65,6 +66,7 @@ Example config: - The heartbeat prompt is sent **verbatim** as the user message. The system prompt includes a "Heartbeat" section only when heartbeats are enabled for the default agent, and the run is flagged internally. - When heartbeats are disabled with `0m`, normal runs also omit `HEARTBEAT.md` from bootstrap context so the model does not see heartbeat-only instructions. - Active hours (`heartbeat.activeHours`) are checked in the configured timezone. Outside the window, heartbeats are skipped until the next tick inside the window. +- Heartbeats automatically defer while cron work is active or queued. Set `heartbeat.skipWhenBusy: true` to defer on extra busy lanes (subagent or nested command work) as well; this is useful for local Ollama and other constrained single-runtime hosts. ## What the heartbeat prompt is for @@ -98,6 +100,7 @@ Outside heartbeats, stray `HEARTBEAT_OK` at the start/end of a message is stripp includeReasoning: false, // default: false (deliver separate Reasoning: message when available) lightContext: false, // default: false; true keeps only HEARTBEAT.md from workspace bootstrap files isolatedSession: false, // default: false; true runs each heartbeat in a fresh session (no conversation history) + skipWhenBusy: false, // default: false; true also waits for subagent/nested lanes target: "last", // default: none | options: last | none | (core or plugin, e.g. "bluebubbles") to: "+15551234567", // optional channel-specific override accountId: "ops-bot", // optional multi-account channel id @@ -230,6 +233,9 @@ Use `accountId` to target a specific account on multi-account channels like Tele When true, each heartbeat runs in a fresh session with no prior conversation history. Uses the same isolation pattern as cron `sessionTarget: "isolated"`. Dramatically reduces per-heartbeat token cost. Combine with `lightContext: true` for maximum savings. Delivery routing still uses the main session context. + + When true, heartbeat runs defer on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeats, even without this flag, so local-model hosts do not run cron and heartbeat prompts at the same time. + Optional session key for heartbeat runs. @@ -287,7 +293,8 @@ Use `accountId` to target a specific account on multi-account channels like Tele - `session` only affects the run context; delivery is controlled by `target` and `to`. - To deliver to a specific channel/recipient, set `target` + `to`. With `target: "last"`, delivery uses the last external channel for that session. - Heartbeat deliveries allow direct/DM targets by default. Set `directPolicy: "block"` to suppress direct-target sends while still running the heartbeat turn. - - If the main queue is busy, the heartbeat is skipped and retried later. + - If the main queue, target session lane, cron lane, or an active cron job is busy, the heartbeat is skipped and retried later. + - If `skipWhenBusy: true`, subagent and nested lanes also defer heartbeat runs. - If `target` resolves to no external destination, the run still happens but no outbound message is sent. diff --git a/docs/gateway/troubleshooting.md b/docs/gateway/troubleshooting.md index 8ef31c8825c..fd37d4f825b 100644 --- a/docs/gateway/troubleshooting.md +++ b/docs/gateway/troubleshooting.md @@ -437,7 +437,7 @@ Look for: - Cron enabled and next wake present. - Job run history status (`ok`, `skipped`, `error`). -- Heartbeat skip reasons (`quiet-hours`, `requests-in-flight`, `alerts-disabled`, `empty-heartbeat-file`, `no-tasks-due`). +- Heartbeat skip reasons (`quiet-hours`, `requests-in-flight`, `cron-in-progress`, `lanes-busy`, `alerts-disabled`, `empty-heartbeat-file`, `no-tasks-due`). diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 38cafcfcef2..72dc15dd1bf 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -5464,6 +5464,12 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { isolatedSession: { type: "boolean", }, + skipWhenBusy: { + type: "boolean", + title: "Heartbeat Skip When Busy", + description: + "When true, defer heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", + }, }, additionalProperties: false, }, @@ -7198,6 +7204,12 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { isolatedSession: { type: "boolean", }, + skipWhenBusy: { + type: "boolean", + title: "Heartbeat Skip When Busy", + description: + "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", + }, }, additionalProperties: false, }, @@ -27261,6 +27273,16 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { label: "Heartbeat Timeout (Seconds)", tags: ["performance", "automation"], }, + "agents.defaults.heartbeat.skipWhenBusy": { + label: "Heartbeat Skip When Busy", + help: "When true, defer heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", + tags: ["automation"], + }, + "agents.list.*.heartbeat.skipWhenBusy": { + label: "Heartbeat Skip When Busy", + help: "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", + tags: ["automation"], + }, "agents.defaults.sandbox.browser.network": { label: "Sandbox Browser Network", help: "Docker network for sandbox browser containers (default: openclaw-sandbox-browser). Avoid bridge if you need stricter isolation.", @@ -28419,6 +28441,11 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { help: "Per-agent maximum time in seconds allowed for a heartbeat agent turn before it is aborted. Leave unset to inherit the merged heartbeat/default agent timeout.", tags: ["performance", "automation"], }, + "agents.list[].heartbeat.skipWhenBusy": { + label: "Agent Heartbeat Skip When Busy", + help: "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", + tags: ["automation"], + }, "agents.list[].sandbox.browser.network": { label: "Agent Sandbox Browser Network", help: "Per-agent override for sandbox browser Docker network.", diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 6f7d6fa55ff..06854bb109b 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -256,6 +256,10 @@ export const FIELD_HELP: Record = { "Maximum time in seconds allowed for a heartbeat agent turn before it is aborted. Leave unset to use agents.defaults.timeoutSeconds.", "agents.list[].heartbeat.timeoutSeconds": "Per-agent maximum time in seconds allowed for a heartbeat agent turn before it is aborted. Leave unset to inherit the merged heartbeat/default agent timeout.", + "agents.defaults.heartbeat.skipWhenBusy": + "When true, defer heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", + "agents.list[].heartbeat.skipWhenBusy": + "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", browser: "Browser runtime controls for local or remote CDP attachment, profile routing, and screenshot/snapshot behavior. Keep defaults unless your automation workflow requires custom browser transport settings.", "browser.enabled": @@ -1684,6 +1688,8 @@ export const FIELD_HELP: Record = { 'Controls whether heartbeat delivery may target direct/DM chats: "allow" (default) permits DM delivery and "block" suppresses direct-target sends.', "agents.list.*.heartbeat.directPolicy": 'Per-agent override for heartbeat direct/DM delivery policy; use "block" for agents that should only send heartbeat alerts to non-DM destinations.', + "agents.list.*.heartbeat.skipWhenBusy": + "Per-agent override that defers heartbeat turns on extra busy lanes: subagent or nested command work. Cron lanes always defer heartbeat turns.", "channels.mattermost.configWrites": "Allow Mattermost to write config in response to channel events/commands (default: true).", "channels.modelByChannel": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 9718d9082c3..54045431afc 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -634,6 +634,8 @@ export const FIELD_LABELS: Record = { "agents.list.*.heartbeat.suppressToolErrorWarnings": "Heartbeat Suppress Tool Error Warnings", "agents.defaults.heartbeat.timeoutSeconds": "Heartbeat Timeout (Seconds)", "agents.list.*.heartbeat.timeoutSeconds": "Heartbeat Timeout (Seconds)", + "agents.defaults.heartbeat.skipWhenBusy": "Heartbeat Skip When Busy", + "agents.list.*.heartbeat.skipWhenBusy": "Heartbeat Skip When Busy", "agents.defaults.sandbox.browser.network": "Sandbox Browser Network", "agents.defaults.sandbox.browser.cdpSourceRange": "Sandbox Browser CDP Source Port Range", "agents.defaults.sandbox.docker.dangerouslyAllowContainerNamespaceJoin": @@ -883,6 +885,7 @@ export const FIELD_LABELS: Record = { "agents.list[].heartbeat.suppressToolErrorWarnings": "Agent Heartbeat Suppress Tool Error Warnings", "agents.list[].heartbeat.timeoutSeconds": "Agent Heartbeat Timeout (Seconds)", + "agents.list[].heartbeat.skipWhenBusy": "Agent Heartbeat Skip When Busy", "agents.list[].sandbox.browser.network": "Agent Sandbox Browser Network", "agents.list[].sandbox.browser.cdpSourceRange": "Agent Sandbox Browser CDP Source Port Range", "agents.list[].sandbox.docker.dangerouslyAllowContainerNamespaceJoin": diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index e887f03936e..9ec543104b0 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -378,6 +378,11 @@ export type AgentDefaultsConfig = { * per-heartbeat token cost by avoiding the full session transcript. */ isolatedSession?: boolean; + /** + * If true, defer heartbeat runs while subagent or nested command lanes are busy. + * Cron lanes are always treated as busy for heartbeat deferral. + */ + skipWhenBusy?: boolean; /** * When enabled, deliver the model's reasoning payload for heartbeat runs (when available) * as a separate message prefixed with `Reasoning:` (same as `/reasoning on`). diff --git a/src/config/zod-schema.agent-defaults.test.ts b/src/config/zod-schema.agent-defaults.test.ts index 53294caea64..f05ae64e927 100644 --- a/src/config/zod-schema.agent-defaults.test.ts +++ b/src/config/zod-schema.agent-defaults.test.ts @@ -131,15 +131,17 @@ describe("agent defaults schema", () => { it("accepts positive heartbeat timeoutSeconds on defaults and agent entries", () => { const defaults = AgentDefaultsSchema.parse({ - heartbeat: { timeoutSeconds: 45 }, + heartbeat: { timeoutSeconds: 45, skipWhenBusy: true }, })!; const agent = AgentEntrySchema.parse({ id: "ops", - heartbeat: { timeoutSeconds: 45 }, + heartbeat: { timeoutSeconds: 45, skipWhenBusy: true }, }); expect(defaults.heartbeat?.timeoutSeconds).toBe(45); + expect(defaults.heartbeat?.skipWhenBusy).toBe(true); expect(agent.heartbeat?.timeoutSeconds).toBe(45); + expect(agent.heartbeat?.skipWhenBusy).toBe(true); }); it("accepts per-agent TTS overrides", () => { diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index 57837910204..be607c0ebb4 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -42,6 +42,7 @@ export const HeartbeatSchema = z timeoutSeconds: z.number().int().positive().optional(), lightContext: z.boolean().optional(), isolatedSession: z.boolean().optional(), + skipWhenBusy: z.boolean().optional(), }) .strict() .superRefine((val, ctx) => { diff --git a/src/cron/active-jobs.ts b/src/cron/active-jobs.ts index c3dcdf6db3d..c75220700e4 100644 --- a/src/cron/active-jobs.ts +++ b/src/cron/active-jobs.ts @@ -33,6 +33,10 @@ export function isCronJobActive(jobId: string) { return getCronActiveJobState().activeJobIds.has(jobId); } +export function hasActiveCronJobs() { + return getCronActiveJobState().activeJobIds.size > 0; +} + export function resetCronActiveJobsForTests() { getCronActiveJobState().activeJobIds.clear(); } diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index 1fb48999099..6547c0d0603 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -1,6 +1,10 @@ import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; +import { + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, + type HeartbeatRunResult, +} from "../infra/heartbeat-wake.js"; import type { CronEvent, CronServiceDeps } from "./service.js"; import { CronService } from "./service.js"; import { createDeferred, createNoopLogger, installCronTestHooks } from "./service.test-harness.js"; @@ -524,7 +528,7 @@ describe("CronService", () => { it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => { const runHeartbeatOnce = vi.fn(async () => ({ status: "skipped" as const, - reason: "requests-in-flight", + reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, })); let now = 0; const nowMs = () => { @@ -562,6 +566,38 @@ describe("CronService", () => { await stopCronAndCleanup(cron, store); }); + it("wakeMode now queues heartbeat when cron active marker blocks synchronous wake", async () => { + const runHeartbeatOnce = vi.fn(async () => ({ + status: "skipped" as const, + reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS, + })); + + const { store, cron, requestHeartbeatNow } = await createWakeModeNowMainHarness({ + runHeartbeatOnce, + }); + + const sessionKey = "agent:main:discord:channel:ops"; + const job = await addWakeModeNowMainSystemEventJob(cron, { + name: "wakeMode now cron marker fallback", + sessionKey, + }); + + await cron.run(job.id, "force"); + + expect(runHeartbeatOnce).toHaveBeenCalledTimes(1); + expect(requestHeartbeatNow).toHaveBeenCalledWith( + expect.objectContaining({ + reason: `cron:${job.id}`, + sessionKey, + }), + ); + expect(job.state.lastStatus).toBe("ok"); + expect(job.state.lastError).toBeUndefined(); + + await cron.list({ includeDisabled: true }); + await stopCronAndCleanup(cron, store); + }); + it("runs an isolated job without posting a fallback summary to main", async () => { const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" })); const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index b7b8000258e..0d1fddafd88 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,6 +1,10 @@ import { resolveFailoverReasonFromError } from "../../agents/failover-error.js"; import type { CronConfig, CronRetryOn } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; +import { + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + isRetryableHeartbeatBusySkipReason, +} from "../../infra/heartbeat-wake.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js"; import { @@ -1350,13 +1354,17 @@ async function executeMainSessionCronJob( sessionKey: targetMainSessionKey, heartbeat: { target: "last" }, }); - if (heartbeatResult.status !== "skipped" || heartbeatResult.reason !== "requests-in-flight") { + if ( + heartbeatResult.status !== "skipped" || + !isRetryableHeartbeatBusySkipReason(heartbeatResult.reason) + ) { break; } - if (isRecurringJob) { + if (isRecurringJob || heartbeatResult.reason === HEARTBEAT_SKIP_CRON_IN_PROGRESS) { // Recurring main-session cron jobs should not hold the cron lane open - // while the main lane is busy, or their measured duration starts to - // reflect queue wait instead of cron bookkeeping (#58833). + // while runtime lanes are busy. A cron-in-progress skip is caused by + // this job's own active marker, so direct wake-now cannot succeed until + // the cron job returns and clears it (#50773). state.deps.requestHeartbeatNow({ reason, agentId: job.agentId, diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index 8b0f7f18aa9..038e4f652e1 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -2,7 +2,13 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import { startHeartbeatRunner } from "./heartbeat-runner.js"; import { computeNextHeartbeatPhaseDueMs, resolveHeartbeatPhaseMs } from "./heartbeat-schedule.js"; -import { requestHeartbeatNow, resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js"; +import { + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, + type RetryableHeartbeatBusySkipReason, + requestHeartbeatNow, + resetHeartbeatWakeStateForTests, +} from "./heartbeat-wake.js"; describe("startHeartbeatRunner", () => { type RunOnce = Parameters[0]["runOnce"]; @@ -44,12 +50,12 @@ describe("startHeartbeatRunner", () => { }); } - function createRequestsInFlightRunSpy(skipCount: number) { + function createRetryableBusyRunSpy(reason: RetryableHeartbeatBusySkipReason, skipCount: number) { let callCount = 0; return vi.fn().mockImplementation(async () => { callCount++; if (callCount <= skipCount) { - return { status: "skipped", reason: "requests-in-flight" } as const; + return { status: "skipped", reason } as const; } return { status: "ran", durationMs: 1 } as const; }); @@ -214,7 +220,7 @@ describe("startHeartbeatRunner", () => { it("reschedules timer when runOnce returns requests-in-flight", async () => { useFakeHeartbeatTime(); - const runSpy = createRequestsInFlightRunSpy(1); + const runSpy = createRetryableBusyRunSpy(HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, 1); const runner = startHeartbeatRunner({ cfg: heartbeatConfig(), @@ -235,6 +241,27 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + it("reschedules timer when runOnce returns cron-in-progress", async () => { + useFakeHeartbeatTime(); + + const runSpy = createRetryableBusyRunSpy(HEARTBEAT_SKIP_CRON_IN_PROGRESS, 1); + + const runner = startHeartbeatRunner({ + cfg: heartbeatConfig(), + runOnce: runSpy, + stableSchedulerSeed: TEST_SCHEDULER_SEED, + }); + const firstDueMs = resolveDueFromNow(0, 30 * 60_000, "main"); + + await vi.advanceTimersByTimeAsync(firstDueMs + 1); + expect(runSpy).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1_000); + expect(runSpy).toHaveBeenCalledTimes(2); + + runner.stop(); + }); + it("does not push nextDueMs forward on repeated requests-in-flight skips", async () => { useFakeHeartbeatTime(); @@ -246,7 +273,7 @@ describe("startHeartbeatRunner", () => { callTimes.push(Date.now()); callCount++; if (callCount <= 5) { - return { status: "skipped", reason: "requests-in-flight" } as const; + return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT } as const; } return { status: "ran", durationMs: 1 } as const; }); diff --git a/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts b/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts index bf76952e97b..78ca93821dc 100644 --- a/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts +++ b/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts @@ -1,9 +1,18 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { resolveNestedAgentLaneForSession } from "../agents/lanes.js"; import type { OpenClawConfig } from "../config/config.js"; +import { markCronJobActive, resetCronActiveJobsForTests } from "../cron/active-jobs.js"; import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js"; +import type { CommandLaneSnapshot } from "../process/command-queue.js"; +import { CommandLane } from "../process/lanes.js"; import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js"; import { type HeartbeatDeps, runHeartbeatOnce } from "./heartbeat-runner.js"; import { seedMainSessionStore, withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js"; +import { + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + HEARTBEAT_SKIP_LANES_BUSY, + HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, +} from "./heartbeat-wake.js"; import { resetSystemEventsForTest, enqueueSystemEvent } from "./system-events.js"; vi.mock("jiti", () => ({ createJiti: () => () => ({}) })); @@ -33,6 +42,7 @@ afterAll(() => { beforeEach(() => { resetSystemEventsForTest(); + resetCronActiveJobsForTests(); }); function createHeartbeatTelegramConfig(): OpenClawConfig { @@ -61,7 +71,99 @@ async function seedHeartbeatTelegramSession(storePath: string, cfg: OpenClawConf }); } +function createBusyLaneSnapshot(lane: string): CommandLaneSnapshot { + return { + lane, + activeCount: 1, + queuedCount: 0, + maxConcurrent: 1, + draining: false, + generation: 0, + }; +} + describe("heartbeat runner skips when target session lane is busy", () => { + it("returns cron-in-progress when cron has an active job", async () => { + await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { + const cfg = createHeartbeatTelegramConfig(); + await seedHeartbeatTelegramSession(storePath, cfg); + markCronJobActive("local-model-report"); + + const result = await runHeartbeatOnce({ + cfg, + deps: { + getQueueSize: vi.fn((_lane?: string) => 0), + nowMs: () => Date.now(), + getReplyFromConfig: replySpy, + } as HeartbeatDeps, + }); + + expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS }); + expect(replySpy).not.toHaveBeenCalled(); + }); + }); + + it("returns cron-in-progress when cron lanes have queued work", async () => { + await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { + const cfg = createHeartbeatTelegramConfig(); + await seedHeartbeatTelegramSession(storePath, cfg); + + const result = await runHeartbeatOnce({ + cfg, + deps: { + getQueueSize: vi.fn((lane?: string) => (lane === CommandLane.Cron ? 1 : 0)), + nowMs: () => Date.now(), + getReplyFromConfig: replySpy, + } as HeartbeatDeps, + }); + + expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS }); + expect(replySpy).not.toHaveBeenCalled(); + }); + }); + + it("returns lanes-busy for opt-in broader busy-lane checks", async () => { + await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { + const cfg = createHeartbeatTelegramConfig(); + cfg.agents!.defaults!.heartbeat = { every: "30m", skipWhenBusy: true }; + await seedHeartbeatTelegramSession(storePath, cfg); + + const result = await runHeartbeatOnce({ + cfg, + deps: { + getQueueSize: vi.fn((lane?: string) => (lane === CommandLane.Subagent ? 1 : 0)), + nowMs: () => Date.now(), + getReplyFromConfig: replySpy, + } as HeartbeatDeps, + }); + + expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY }); + expect(replySpy).not.toHaveBeenCalled(); + }); + }); + + it("returns lanes-busy for opt-in work in any session-scoped nested lane", async () => { + await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { + const cfg = createHeartbeatTelegramConfig(); + cfg.agents!.defaults!.heartbeat = { every: "30m", skipWhenBusy: true }; + await seedHeartbeatTelegramSession(storePath, cfg); + const nestedSessionLane = resolveNestedAgentLaneForSession("agent:other:telegram:123"); + + const result = await runHeartbeatOnce({ + cfg, + deps: { + getQueueSize: vi.fn((_lane?: string) => 0), + getCommandLaneSnapshots: vi.fn(() => [createBusyLaneSnapshot(nestedSessionLane)]), + nowMs: () => Date.now(), + getReplyFromConfig: replySpy, + } as HeartbeatDeps, + }); + + expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY }); + expect(replySpy).not.toHaveBeenCalled(); + }); + }); + it("returns requests-in-flight when session lane has queued work", async () => { await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { const cfg = createHeartbeatTelegramConfig(); @@ -93,7 +195,7 @@ describe("heartbeat runner skips when target session lane is busy", () => { expect(result.status).toBe("skipped"); if (result.status === "skipped") { - expect(result.reason).toBe("requests-in-flight"); + expect(result.reason).toBe(HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT); } expect(replySpy).not.toHaveBeenCalled(); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 7a7a7b9eb72..af259caf072 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -12,6 +12,7 @@ import { } from "../agents/agent-scope.js"; import { appendCronStyleCurrentTimeLine } from "../agents/current-time.js"; import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; +import { isNestedAgentLane } from "../agents/lanes.js"; import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js"; import { DEFAULT_HEARTBEAT_FILENAME } from "../agents/workspace.js"; import { resolveHeartbeatReplyPayload } from "../auto-reply/heartbeat-reply-payload.js"; @@ -46,10 +47,15 @@ import { } from "../config/sessions/store.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { hasActiveCronJobs } from "../cron/active-jobs.js"; import { resolveCronSession } from "../cron/isolated-agent/session.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getActivePluginChannelRegistry } from "../plugins/runtime.js"; -import { getQueueSize } from "../process/command-queue.js"; +import { + getCommandLaneSnapshots, + getQueueSize, + type CommandLaneSnapshot, +} from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; import { isSubagentSessionKey, @@ -91,9 +97,13 @@ import { createHeartbeatTypingCallbacks } from "./heartbeat-typing.js"; import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js"; import { areHeartbeatsEnabled, + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + HEARTBEAT_SKIP_LANES_BUSY, + HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, type HeartbeatRunResult, type HeartbeatWakeHandler, type HeartbeatWakeRequest, + isRetryableHeartbeatBusySkipReason, requestHeartbeatNow, setHeartbeatsEnabled, setHeartbeatWakeHandler, @@ -116,6 +126,7 @@ export type HeartbeatDeps = OutboundSendDeps & getReplyFromConfig?: typeof import("./heartbeat-runner.runtime.js").getReplyFromConfig; runtime?: RuntimeEnv; getQueueSize?: (lane?: string) => number; + getCommandLaneSnapshots?: () => readonly CommandLaneSnapshot[]; nowMs?: () => number; }; @@ -128,6 +139,35 @@ function loadHeartbeatRunnerRuntime() { return heartbeatRunnerRuntimePromise; } +const HEARTBEAT_ALWAYS_BUSY_LANES = [CommandLane.Cron, CommandLane.CronNested] as const; +const HEARTBEAT_OPT_IN_BUSY_LANES = [CommandLane.Subagent, CommandLane.Nested] as const; + +function hasQueuedWorkInLanes( + lanes: readonly string[], + getSize: (lane?: string) => number, +): boolean { + return lanes.some((lane) => getSize(lane) > 0); +} + +function hasQueuedWorkInLaneSnapshots( + snapshots: readonly CommandLaneSnapshot[], + matchesLane: (lane: string) => boolean, +): boolean { + return snapshots.some( + (snapshot) => matchesLane(snapshot.lane) && snapshot.activeCount + snapshot.queuedCount > 0, + ); +} + +function hasOptInBusyLaneWork( + getSize: (lane?: string) => number, + getSnapshots: () => readonly CommandLaneSnapshot[], +): boolean { + return ( + hasQueuedWorkInLanes(HEARTBEAT_OPT_IN_BUSY_LANES, getSize) || + hasQueuedWorkInLaneSnapshots(getSnapshots(), isNestedAgentLane) + ); +} + function resolveHeartbeatChannelPlugin(channel: string): ChannelPlugin | undefined { const activePlugin = getActivePluginChannelRegistry()?.channels.find( (entry) => entry.plugin.id === channel, @@ -755,9 +795,28 @@ export async function runHeartbeatOnce(opts: { return { status: "skipped", reason: "quiet-hours" }; } - const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main); - if (queueSize > 0) { - return { status: "skipped", reason: "requests-in-flight" }; + const getSize = opts.deps?.getQueueSize ?? getQueueSize; + const getSnapshots = opts.deps?.getCommandLaneSnapshots ?? getCommandLaneSnapshots; + if (getSize(CommandLane.Main) > 0) { + return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT }; + } + + if (hasActiveCronJobs() || hasQueuedWorkInLanes(HEARTBEAT_ALWAYS_BUSY_LANES, getSize)) { + emitHeartbeatEvent({ + status: "skipped", + reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS, + durationMs: Date.now() - startedAt, + }); + return { status: "skipped", reason: HEARTBEAT_SKIP_CRON_IN_PROGRESS }; + } + + if (heartbeat?.skipWhenBusy === true && hasOptInBusyLaneWork(getSize, getSnapshots)) { + emitHeartbeatEvent({ + status: "skipped", + reason: HEARTBEAT_SKIP_LANES_BUSY, + durationMs: Date.now() - startedAt, + }); + return { status: "skipped", reason: HEARTBEAT_SKIP_LANES_BUSY }; } // Preflight centralizes trigger classification, event inspection, and HEARTBEAT.md gating. @@ -782,14 +841,13 @@ export async function runHeartbeatOnce(opts: { // an active streaming turn. The wake-layer retry (heartbeat-wake.ts) will // re-schedule this wake automatically. See #14396 (closed without merge). const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey); - const sessionLaneSize = (opts.deps?.getQueueSize ?? getQueueSize)(sessionLaneKey); - if (sessionLaneSize > 0) { + if (getSize(sessionLaneKey) > 0) { emitHeartbeatEvent({ status: "skipped", - reason: "requests-in-flight", + reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, durationMs: Date.now() - startedAt, }); - return { status: "skipped", reason: "requests-in-flight" }; + return { status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT }; } const previousUpdatedAt = entry?.updatedAt; @@ -1484,9 +1542,9 @@ export function startHeartbeatRunner(opts: { const startedAt = Date.now(); const now = startedAt; let ran = false; - // Track requests-in-flight so we can skip re-arm in finally — the wake + // Track retryable busy skips so we can skip re-arm in finally — the wake // layer handles retry for this case (DEFAULT_RETRY_MS = 1 s). - let requestsInFlight = false; + let retryableBusySkip = false; try { if (requestedSessionKey || requestedAgentId) { @@ -1504,6 +1562,10 @@ export function startHeartbeatRunner(opts: { sessionKey: requestedSessionKey, deps: { runtime: state.runtime }, }); + if (res.status === "skipped" && isRetryableHeartbeatBusySkipReason(res.reason)) { + retryableBusySkip = true; + return res; + } if (res.status !== "skipped" || res.reason !== "disabled") { advanceAgentSchedule(targetAgent, now, reason); } @@ -1538,12 +1600,12 @@ export function startHeartbeatRunner(opts: { advanceAgentSchedule(agent, now, reason); continue; } - if (res.status === "skipped" && res.reason === "requests-in-flight") { + if (res.status === "skipped" && isRetryableHeartbeatBusySkipReason(res.reason)) { // Do not advance the schedule — the main lane is busy and the wake // layer will retry shortly (DEFAULT_RETRY_MS = 1 s). Calling // scheduleNext() here would register a 0 ms timer that races with // the wake layer's 1 s retry and wins, bypassing the cooldown. - requestsInFlight = true; + retryableBusySkip = true; return res; } if (res.status !== "skipped" || res.reason !== "disabled") { @@ -1559,9 +1621,9 @@ export function startHeartbeatRunner(opts: { } return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; } finally { - // Always re-arm the timer — except for requests-in-flight, where the + // Always re-arm the timer — except for retryable busy skips, where the // wake layer (heartbeat-wake.ts) handles retry via schedule(DEFAULT_RETRY_MS). - if (!requestsInFlight) { + if (!retryableBusySkip) { scheduleNext(); } } diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts index 4dd2ce30402..8419d286372 100644 --- a/src/infra/heartbeat-wake.test.ts +++ b/src/infra/heartbeat-wake.test.ts @@ -1,5 +1,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + HEARTBEAT_SKIP_LANES_BUSY, + HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, hasHeartbeatWakeHandler, hasPendingHeartbeatWake, requestHeartbeatNow, @@ -11,7 +14,7 @@ describe("heartbeat-wake", () => { function setRetryOnceHeartbeatHandler() { const handler = vi .fn() - .mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT }) .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); setHeartbeatWakeHandler(handler); return handler; @@ -72,7 +75,7 @@ describe("heartbeat-wake", () => { vi.useFakeTimers(); const handler = vi .fn() - .mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT }) .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); await expectRetryAfterDefaultDelay({ handler, @@ -81,6 +84,22 @@ describe("heartbeat-wake", () => { }); }); + it.each([HEARTBEAT_SKIP_CRON_IN_PROGRESS, HEARTBEAT_SKIP_LANES_BUSY])( + "retries %s after the default retry delay", + async (reason) => { + vi.useFakeTimers(); + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped", reason }) + .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); + await expectRetryAfterDefaultDelay({ + handler, + initialReason: "interval", + expectedRetryReason: "interval", + }); + }, + ); + it("keeps retry cooldown even when a sooner request arrives", async () => { vi.useFakeTimers(); const handler = setRetryOnceHeartbeatHandler(); @@ -219,7 +238,9 @@ describe("heartbeat-wake", () => { it("clears stale retry cooldown when a new handler is registered", async () => { vi.useFakeTimers(); - const handlerA = vi.fn().mockResolvedValue({ status: "skipped", reason: "requests-in-flight" }); + const handlerA = vi + .fn() + .mockResolvedValue({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT }); setHeartbeatWakeHandler(handlerA); requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 5116e046e8f..629259bc7fd 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -10,6 +10,24 @@ export type HeartbeatRunResult = | { status: "skipped"; reason: string } | { status: "failed"; reason: string }; +export const HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT = "requests-in-flight"; +export const HEARTBEAT_SKIP_CRON_IN_PROGRESS = "cron-in-progress"; +export const HEARTBEAT_SKIP_LANES_BUSY = "lanes-busy"; +export type RetryableHeartbeatBusySkipReason = + | typeof HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT + | typeof HEARTBEAT_SKIP_CRON_IN_PROGRESS + | typeof HEARTBEAT_SKIP_LANES_BUSY; + +const RETRYABLE_BUSY_SKIP_REASONS = new Set([ + HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT, + HEARTBEAT_SKIP_CRON_IN_PROGRESS, + HEARTBEAT_SKIP_LANES_BUSY, +]); + +export function isRetryableHeartbeatBusySkipReason(reason: string): boolean { + return RETRYABLE_BUSY_SKIP_REASONS.has(reason); +} + export type HeartbeatWakeRequest = { reason?: string; agentId?: string; @@ -175,8 +193,8 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { ...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}), }; const res = await active(wakeOpts); - if (res.status === "skipped" && res.reason === "requests-in-flight") { - // The main lane is busy; retry this wake target soon. + if (res.status === "skipped" && isRetryableHeartbeatBusySkipReason(res.reason)) { + // The target runtime is busy; retry this wake target soon. queuePendingWakeReason({ reason: pendingWake.reason ?? "retry", agentId: pendingWake.agentId, diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index c08ec441874..149d12cee16 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -27,6 +27,7 @@ let enqueueCommandInLane: CommandQueueModule["enqueueCommandInLane"]; let GatewayDrainingError: CommandQueueModule["GatewayDrainingError"]; let getActiveTaskCount: CommandQueueModule["getActiveTaskCount"]; let getCommandLaneSnapshot: CommandQueueModule["getCommandLaneSnapshot"]; +let getCommandLaneSnapshots: CommandQueueModule["getCommandLaneSnapshots"]; let getQueueSize: CommandQueueModule["getQueueSize"]; let markGatewayDraining: CommandQueueModule["markGatewayDraining"]; let resetAllLanes: CommandQueueModule["resetAllLanes"]; @@ -67,6 +68,7 @@ describe("command queue", () => { GatewayDrainingError, getActiveTaskCount, getCommandLaneSnapshot, + getCommandLaneSnapshots, getQueueSize, markGatewayDraining, resetAllLanes, @@ -349,6 +351,38 @@ describe("command queue", () => { await expect(second).resolves.toBe("second"); }); + it("getCommandLaneSnapshots reports all live lanes in stable order", async () => { + const alphaLane = `snapshot-all-alpha-${Date.now()}-${Math.random().toString(16).slice(2)}`; + const betaLane = `snapshot-all-beta-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(alphaLane, 1); + setCommandLaneConcurrency(betaLane, 1); + + const alphaBlocker = createDeferred(); + const betaBlocker = createDeferred(); + const alpha = enqueueCommandInLane(alphaLane, async () => { + await alphaBlocker.promise; + return "alpha"; + }); + const beta = enqueueCommandInLane(betaLane, async () => { + await betaBlocker.promise; + return "beta"; + }); + + const snapshots = getCommandLaneSnapshots().filter( + (snapshot) => snapshot.lane === alphaLane || snapshot.lane === betaLane, + ); + expect(snapshots.map((snapshot) => snapshot.lane)).toEqual([alphaLane, betaLane]); + expect(snapshots).toEqual([ + expect.objectContaining({ lane: alphaLane, activeCount: 1, queuedCount: 0 }), + expect.objectContaining({ lane: betaLane, activeCount: 1, queuedCount: 0 }), + ]); + + alphaBlocker.resolve(); + betaBlocker.resolve(); + await expect(alpha).resolves.toBe("alpha"); + await expect(beta).resolves.toBe("beta"); + }); + it("waitForActiveTasks ignores tasks that start after the call", async () => { const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 2); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 361d7e0d090..c640f36b939 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -103,6 +103,17 @@ function getLaneDepth(state: LaneState): number { return state.queue.length + state.activeTaskIds.size; } +function createCommandLaneSnapshot(state: LaneState): CommandLaneSnapshot { + return { + lane: state.lane, + queuedCount: state.queue.length, + activeCount: state.activeTaskIds.size, + maxConcurrent: state.maxConcurrent, + draining: state.draining, + generation: state.generation, + }; +} + function getLaneState(lane: string): LaneState { const queueState = getQueueState(); const existing = queueState.lanes.get(lane); @@ -309,14 +320,13 @@ export function getCommandLaneSnapshot(lane: string = CommandLane.Main): Command generation: 0, }; } - return { - lane: resolved, - queuedCount: state.queue.length, - activeCount: state.activeTaskIds.size, - maxConcurrent: state.maxConcurrent, - draining: state.draining, - generation: state.generation, - }; + return createCommandLaneSnapshot(state); +} + +export function getCommandLaneSnapshots(): CommandLaneSnapshot[] { + return Array.from(getQueueState().lanes.values(), createCommandLaneSnapshot).toSorted((a, b) => + a.lane.localeCompare(b.lane), + ); } export function getTotalQueueSize() {