From 7d74c29dcc07a49b5639ec7bae1415603a9e3af7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 09:38:57 +0100 Subject: [PATCH] fix: isolate cron nested lane concurrency --- CHANGELOG.md | 2 +- docs/automation/cron-jobs.md | 2 +- docs/concepts/queue.md | 2 +- src/agents/lanes.test.ts | 26 +++++++++++++++---- src/agents/lanes.ts | 15 ++++++++--- src/agents/pi-embedded-runner/lanes.test.ts | 5 ++-- src/agents/pi-embedded-runner/lanes.ts | 5 ++-- src/cron/isolated-agent.lane.test.ts | 8 +++--- .../isolated-agent/run-embedded.runtime.ts | 2 +- src/cron/isolated-agent/run-executor.ts | 4 +-- src/cron/isolated-agent/run.test-harness.ts | 8 +++--- src/gateway/server-lanes.test.ts | 26 ++++++++++++++++--- src/gateway/server-lanes.ts | 4 +-- src/process/lanes.ts | 1 + 14 files changed, 79 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62086f6a5b1..4eed59bb357 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,7 +61,7 @@ Docs: https://docs.openclaw.ai - Google Meet: add `realtime.agentId` so live meeting consults can target a named OpenClaw agent instead of always using `main`. (#72381) Thanks @BsnizND. - Google Meet: route stateful `google_meet` tool actions through the gateway-owned runtime so created or joined realtime sessions remain visible to status, speak, and leave after the agent turn ends. Fixes #72440. (#72441) Thanks @BsnizND. - Matrix/E2EE: stabilize recovery and broken-device QA flows while avoiding Matrix device-cleanup sync races that could leave shutdown-time crypto work running. Thanks @gumadeiras. -- Cron: apply `cron.maxConcurrentRuns` to isolated agent-turn execution as well as cron dispatch, so parallel cron jobs no longer serialize on the inner nested LLM lane. Fixes #72707. Thanks @kagura-agent. +- Cron: apply `cron.maxConcurrentRuns` to a dedicated `cron-nested` isolated agent-turn lane as well as cron dispatch, so parallel cron jobs no longer serialize on inner LLM execution while non-cron nested flows keep their existing lane behavior. Fixes #72707. Thanks @kagura-agent. - Cron: treat isolated run-level agent failures as job errors even when no reply payload is produced, synthesizing a safe error payload so model/provider failures increment error counters and trigger failure notifications instead of clearing as successful. Fixes #43604; carries forward #43631. Thanks @SPFAdvisors. - Cron: preserve exact `NO_REPLY` tool results from isolated jobs with empty final assistant turns as quiet successes instead of surfacing incomplete-turn errors. Fixes #68452; carries forward #68453. Thanks @anyech. - Cron: resolve failure alerts and failure-destination announcements against `session:` targets before falling back to the creator session, so jobs created from group chats can notify the targeted direct session without cross-account routing errors. Refs #62777; carries forward #68535. Thanks @slideshow-dingo and @likewen-tech. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index dfc2396c17f..8cf035b86ad 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -398,7 +398,7 @@ Model override note: } ``` -`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's `nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. +`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's dedicated `cron-nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. The shared non-cron `nested` lane is not widened by this setting. The runtime state sidecar is derived from `cron.store`: a `.json` store such as `~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path without a `.json` suffix appends `-state.json`. diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md index 16524951b97..fc92c82b9af 100644 --- a/docs/concepts/queue.md +++ b/docs/concepts/queue.md @@ -77,7 +77,7 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`. - Applies to auto-reply agent runs across all inbound channels that use the gateway reply pipeline (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, webchat, etc.). - Default lane (`main`) is process-wide for inbound + main heartbeats; set `agents.defaults.maxConcurrent` to allow multiple sessions in parallel. -- Additional lanes may exist (e.g. `cron`, `nested`, `subagent`) so background jobs can run in parallel without blocking inbound replies. Isolated cron agent turns hold a `cron` slot while their inner agent execution uses `nested`; both use `cron.maxConcurrentRuns`. These detached runs are tracked as [background tasks](/automation/tasks). +- Additional lanes may exist (e.g. `cron`, `cron-nested`, `nested`, `subagent`) so background jobs can run in parallel without blocking inbound replies. Isolated cron agent turns hold a `cron` slot while their inner agent execution uses `cron-nested`; both use `cron.maxConcurrentRuns`. Shared non-cron `nested` flows keep their own lane behavior. These detached runs are tracked as [background tasks](/automation/tasks). - Per-session lanes guarantee that only one agent run touches a given session at a time. - No external dependencies or background worker threads; pure TypeScript + promises. diff --git a/src/agents/lanes.test.ts b/src/agents/lanes.test.ts index 0e0db437b53..2c0201aacaf 100644 --- a/src/agents/lanes.test.ts +++ b/src/agents/lanes.test.ts @@ -1,7 +1,10 @@ import { describe, expect, it } from "vitest"; +import { CommandLane } from "../process/lanes.js"; import { + AGENT_LANE_CRON_NESTED, AGENT_LANE_NESTED, isNestedAgentLane, + resolveCronAgentLane, resolveNestedAgentLane, resolveNestedAgentLaneForSession, } from "./lanes.js"; @@ -11,14 +14,27 @@ describe("resolveNestedAgentLane", () => { expect(resolveNestedAgentLane()).toBe(AGENT_LANE_NESTED); }); - it("moves cron lane callers onto the nested lane", () => { - expect(resolveNestedAgentLane("cron")).toBe(AGENT_LANE_NESTED); - expect(resolveNestedAgentLane(" cron ")).toBe(AGENT_LANE_NESTED); + it("preserves explicit lanes", () => { + expect(resolveNestedAgentLane("cron")).toBe(CommandLane.Cron); + expect(resolveNestedAgentLane(" cron ")).toBe(CommandLane.Cron); + expect(resolveNestedAgentLane("subagent")).toBe("subagent"); + expect(resolveNestedAgentLane(" custom-lane ")).toBe("custom-lane"); + }); +}); + +describe("resolveCronAgentLane", () => { + it("defaults cron-owned runs to the cron-nested lane", () => { + expect(resolveCronAgentLane()).toBe(AGENT_LANE_CRON_NESTED); + }); + + it("moves cron lane callers onto the cron-nested lane", () => { + expect(resolveCronAgentLane("cron")).toBe(AGENT_LANE_CRON_NESTED); + expect(resolveCronAgentLane(" cron ")).toBe(AGENT_LANE_CRON_NESTED); }); it("preserves non-cron lanes", () => { - expect(resolveNestedAgentLane("subagent")).toBe("subagent"); - expect(resolveNestedAgentLane(" custom-lane ")).toBe("custom-lane"); + expect(resolveCronAgentLane("subagent")).toBe("subagent"); + expect(resolveCronAgentLane(" custom-lane ")).toBe("custom-lane"); }); }); diff --git a/src/agents/lanes.ts b/src/agents/lanes.ts index 68086a2b742..adb4e0a9393 100644 --- a/src/agents/lanes.ts +++ b/src/agents/lanes.ts @@ -1,20 +1,29 @@ import { CommandLane } from "../process/lanes.js"; export const AGENT_LANE_NESTED = CommandLane.Nested; +export const AGENT_LANE_CRON_NESTED = CommandLane.CronNested; export const AGENT_LANE_SUBAGENT = CommandLane.Subagent; const NESTED_LANE = "nested"; const NESTED_LANE_PREFIX = `${NESTED_LANE}:`; export function resolveNestedAgentLane(lane?: string): string { const trimmed = lane?.trim(); - // Nested agent runs should not inherit the cron execution lane. Cron jobs - // already occupy that lane while they dispatch inner work. - if (!trimmed || trimmed === "cron") { + if (!trimmed) { return AGENT_LANE_NESTED; } return trimmed; } +export function resolveCronAgentLane(lane?: string): string { + const trimmed = lane?.trim(); + // Cron jobs already occupy the outer cron lane, so inner agent work needs + // its own lane to avoid self-deadlock without widening shared nested flows. + if (!trimmed || trimmed === CommandLane.Cron) { + return AGENT_LANE_CRON_NESTED; + } + return trimmed; +} + export function resolveNestedAgentLaneForSession(sessionKey: string | undefined): string { const trimmed = sessionKey?.trim(); if (!trimmed) { diff --git a/src/agents/pi-embedded-runner/lanes.test.ts b/src/agents/pi-embedded-runner/lanes.test.ts index c0294dd5b9d..c699e674a0d 100644 --- a/src/agents/pi-embedded-runner/lanes.test.ts +++ b/src/agents/pi-embedded-runner/lanes.test.ts @@ -10,12 +10,12 @@ describe("resolveGlobalLane", () => { } }); - it("maps cron lane to nested lane to prevent deadlocks", () => { + it("maps cron lane to cron-nested lane to prevent deadlocks", () => { // When cron jobs trigger nested agent runs, the outer execution holds // the cron lane slot. Inner work must use a separate lane to avoid // deadlock. See: https://github.com/openclaw/openclaw/issues/44805 for (const lane of ["cron", " cron "]) { - expect(resolveGlobalLane(lane)).toBe(CommandLane.Nested); + expect(resolveGlobalLane(lane)).toBe(CommandLane.CronNested); } }); @@ -23,6 +23,7 @@ describe("resolveGlobalLane", () => { for (const [lane, expected] of [ ["main", CommandLane.Main], ["subagent", CommandLane.Subagent], + ["cron-nested", CommandLane.CronNested], ["nested", CommandLane.Nested], ["custom-lane", "custom-lane"], [" custom ", "custom"], diff --git a/src/agents/pi-embedded-runner/lanes.ts b/src/agents/pi-embedded-runner/lanes.ts index 57ffd1b4255..26619d67cac 100644 --- a/src/agents/pi-embedded-runner/lanes.ts +++ b/src/agents/pi-embedded-runner/lanes.ts @@ -7,9 +7,10 @@ export function resolveSessionLane(key: string) { export function resolveGlobalLane(lane?: string) { const cleaned = lane?.trim(); - // Cron jobs hold the cron lane slot; inner operations must use nested to avoid deadlock. + // Cron jobs hold the cron lane slot; inner operations need a dedicated lane + // to avoid deadlock without widening shared nested flows. if (cleaned === CommandLane.Cron) { - return CommandLane.Nested; + return CommandLane.CronNested; } return cleaned ? cleaned : CommandLane.Main; } diff --git a/src/cron/isolated-agent.lane.test.ts b/src/cron/isolated-agent.lane.test.ts index aa1b81a1df6..59a4e510e85 100644 --- a/src/cron/isolated-agent.lane.test.ts +++ b/src/cron/isolated-agent.lane.test.ts @@ -82,15 +82,15 @@ describe("runCronIsolatedAgentTurn lane selection", () => { vi.resetModules(); }); - it("moves the cron lane to nested for embedded runs", async () => { + it("moves the cron lane to cron-nested for embedded runs", async () => { await withTempCronHome(async (home) => { - expect(await runLaneCase(home, "cron")).toBe("nested"); + expect(await runLaneCase(home, "cron")).toBe("cron-nested"); }); }); - it("defaults missing lanes to nested for embedded runs", async () => { + it("defaults missing lanes to cron-nested for embedded runs", async () => { await withTempCronHome(async (home) => { - expect(await runLaneCase(home)).toBe("nested"); + expect(await runLaneCase(home)).toBe("cron-nested"); }); }); diff --git a/src/cron/isolated-agent/run-embedded.runtime.ts b/src/cron/isolated-agent/run-embedded.runtime.ts index 6828dd4e0c2..765e19177a1 100644 --- a/src/cron/isolated-agent/run-embedded.runtime.ts +++ b/src/cron/isolated-agent/run-embedded.runtime.ts @@ -1,3 +1,3 @@ export { resolveFastModeState } from "../../agents/fast-mode.js"; -export { resolveNestedAgentLane } from "../../agents/lanes.js"; +export { resolveCronAgentLane } from "../../agents/lanes.js"; export { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts index 927a5f14d8f..e50c66f6a75 100644 --- a/src/cron/isolated-agent/run-executor.ts +++ b/src/cron/isolated-agent/run-executor.ts @@ -156,7 +156,7 @@ export function createCronPromptExecutor(params: { ); return result; } - const { resolveFastModeState, resolveNestedAgentLane, runEmbeddedPiAgent } = + const { resolveCronAgentLane, resolveFastModeState, runEmbeddedPiAgent } = await loadCronEmbeddedRuntime(); const currentChannelId = await resolveCurrentChannelTarget({ channel: params.messageChannel, @@ -183,7 +183,7 @@ export function createCronPromptExecutor(params: { config: params.cfgWithAgentDefaults, skillsSnapshot: params.skillsSnapshot, prompt: promptText, - lane: resolveNestedAgentLane(params.lane), + lane: resolveCronAgentLane(params.lane), provider: providerOverride, model: modelOverride, authProfileId: params.liveSelection.authProfileId, diff --git a/src/cron/isolated-agent/run.test-harness.ts b/src/cron/isolated-agent/run.test-harness.ts index 3bd450ac623..22d294f701c 100644 --- a/src/cron/isolated-agent/run.test-harness.ts +++ b/src/cron/isolated-agent/run.test-harness.ts @@ -73,7 +73,7 @@ export const retireSessionMcpRuntimeMock = createMock(); const resolveBootstrapWarningSignaturesSeenMock = createMock(); const resolveCronStyleNowMock = createMock(); -const resolveNestedAgentLaneMock = createMock(); +const resolveCronAgentLaneMock = createMock(); const resolveAgentTimeoutMsMock = createMock(); const deriveSessionTotalTokensMock = createMock(); const hasNonzeroUsageMock = createMock(); @@ -165,7 +165,7 @@ vi.mock("./run-execution.runtime.js", () => ({ getCliSessionId: getCliSessionIdMock, runCliAgent: runCliAgentMock, resolveFastModeState: resolveFastModeStateMock, - resolveNestedAgentLane: resolveNestedAgentLaneMock, + resolveCronAgentLane: resolveCronAgentLaneMock, LiveSessionModelSwitchError, runWithModelFallback: runWithModelFallbackMock, isCliProvider: isCliProviderMock, @@ -184,7 +184,7 @@ vi.mock("./run-auth-profile.runtime.js", () => ({ vi.mock("./run-embedded.runtime.js", () => ({ resolveFastModeState: resolveFastModeStateMock, - resolveNestedAgentLane: resolveNestedAgentLaneMock, + resolveCronAgentLane: resolveCronAgentLaneMock, runEmbeddedPiAgent: runEmbeddedPiAgentMock, })); @@ -339,7 +339,7 @@ function resetRunExecutionMocks(): void { isCliProviderMock.mockReturnValue(false); resolveBootstrapWarningSignaturesSeenMock.mockReturnValue(new Set()); resolveFastModeStateMock.mockImplementation((params) => resolveFastModeStateImpl(params)); - resolveNestedAgentLaneMock.mockReturnValue(undefined); + resolveCronAgentLaneMock.mockReturnValue(undefined); normalizeVerboseLevelMock.mockImplementation((value: unknown) => value ?? "off"); resolveSessionTranscriptPathMock.mockReturnValue("/tmp/transcript.jsonl"); registerAgentRunContextMock.mockReturnValue(undefined); diff --git a/src/gateway/server-lanes.test.ts b/src/gateway/server-lanes.test.ts index ef8ae324c9f..bd2ea60aa0b 100644 --- a/src/gateway/server-lanes.test.ts +++ b/src/gateway/server-lanes.test.ts @@ -19,7 +19,7 @@ describe("applyGatewayLaneConcurrency", () => { resetCommandQueueStateForTest(); }); - it("applies cron maxConcurrentRuns to the nested lane used by cron agent turns", async () => { + it("applies cron maxConcurrentRuns to the cron-nested lane used by cron agent turns", async () => { applyGatewayLaneConcurrency({ cron: { maxConcurrentRuns: 2 } } as OpenClawConfig); let activeRuns = 0; @@ -40,8 +40,8 @@ describe("applyGatewayLaneConcurrency", () => { } }; - const first = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 }); - const second = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 }); + const first = enqueueCommandInLane(CommandLane.CronNested, run, { warnAfterMs: 10_000 }); + const second = enqueueCommandInLane(CommandLane.CronNested, run, { warnAfterMs: 10_000 }); const timeout = setTimeout(() => { bothRunsStarted.reject( new Error("timed out waiting for nested cron work to run in parallel"), @@ -57,4 +57,24 @@ describe("applyGatewayLaneConcurrency", () => { await Promise.all([first, second]); } }); + + it("keeps the shared nested lane at its default concurrency", async () => { + applyGatewayLaneConcurrency({ cron: { maxConcurrentRuns: 2 } } as OpenClawConfig); + + let startedRuns = 0; + const releaseRuns = createDeferred(); + const run = async () => { + startedRuns += 1; + await releaseRuns.promise; + }; + + const first = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 }); + const second = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 }); + await Promise.resolve(); + + expect(startedRuns).toBe(1); + + releaseRuns.resolve(); + await Promise.all([first, second]); + }); }); diff --git a/src/gateway/server-lanes.ts b/src/gateway/server-lanes.ts index 7a366884474..16e77f70082 100644 --- a/src/gateway/server-lanes.ts +++ b/src/gateway/server-lanes.ts @@ -6,8 +6,8 @@ import { CommandLane } from "../process/lanes.js"; export function applyGatewayLaneConcurrency(cfg: OpenClawConfig) { const cronMaxConcurrentRuns = cfg.cron?.maxConcurrentRuns ?? 1; setCommandLaneConcurrency(CommandLane.Cron, cronMaxConcurrentRuns); - // Cron isolated agent turns remap their inner LLM work to the nested lane. - setCommandLaneConcurrency(CommandLane.Nested, cronMaxConcurrentRuns); + // Cron isolated agent turns remap inner LLM work to this lane. + setCommandLaneConcurrency(CommandLane.CronNested, cronMaxConcurrentRuns); setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg)); setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg)); } diff --git a/src/process/lanes.ts b/src/process/lanes.ts index 63ef1f534f9..6326c73f607 100644 --- a/src/process/lanes.ts +++ b/src/process/lanes.ts @@ -1,6 +1,7 @@ export const enum CommandLane { Main = "main", Cron = "cron", + CronNested = "cron-nested", Subagent = "subagent", Nested = "nested", }