diff --git a/CHANGELOG.md b/CHANGELOG.md index c4f6a23a895..df4c46920e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ Docs: https://docs.openclaw.ai - Google Meet: add `realtime.agentId` so live meeting consults can target a named OpenClaw agent instead of always using `main`. (#72381) Thanks @BsnizND. - Google Meet: route stateful `google_meet` tool actions through the gateway-owned runtime so created or joined realtime sessions remain visible to status, speak, and leave after the agent turn ends. Fixes #72440. (#72441) Thanks @BsnizND. - Matrix/E2EE: stabilize recovery and broken-device QA flows while avoiding Matrix device-cleanup sync races that could leave shutdown-time crypto work running. Thanks @gumadeiras. +- Cron: apply `cron.maxConcurrentRuns` to isolated agent-turn execution as well as cron dispatch, so parallel cron jobs no longer serialize on the inner nested LLM lane. Fixes #72707. Thanks @kagura-agent. - Cron: treat isolated run-level agent failures as job errors even when no reply payload is produced, synthesizing a safe error payload so model/provider failures increment error counters and trigger failure notifications instead of clearing as successful. Fixes #43604; carries forward #43631. Thanks @SPFAdvisors. - Cron: preserve exact `NO_REPLY` tool results from isolated jobs with empty final assistant turns as quiet successes instead of surfacing incomplete-turn errors. Fixes #68452; carries forward #68453. Thanks @anyech. - Cron: resolve failure alerts and failure-destination announcements against `session:` targets before falling back to the creator session, so jobs created from group chats can notify the targeted direct session without cross-account routing errors. Refs #62777; carries forward #68535. Thanks @slideshow-dingo and @likewen-tech. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index e82549a54b4..dfc2396c17f 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -398,6 +398,8 @@ Model override note: } ``` +`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's `nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. + The runtime state sidecar is derived from `cron.store`: a `.json` store such as `~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path without a `.json` suffix appends `-state.json`. If you hand-edit `jobs.json`, leave `jobs-state.json` out of source control. OpenClaw uses that sidecar for pending slots, active markers, last-run metadata, and the schedule identity that tells the scheduler when an externally edited job needs a fresh `nextRunAtMs`. diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md index fe220781c2c..16524951b97 100644 --- a/docs/concepts/queue.md +++ b/docs/concepts/queue.md @@ -77,7 +77,7 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`. - Applies to auto-reply agent runs across all inbound channels that use the gateway reply pipeline (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, webchat, etc.). - Default lane (`main`) is process-wide for inbound + main heartbeats; set `agents.defaults.maxConcurrent` to allow multiple sessions in parallel. -- Additional lanes may exist (e.g. `cron`, `subagent`) so background jobs can run in parallel without blocking inbound replies. These detached runs are tracked as [background tasks](/automation/tasks). +- Additional lanes may exist (e.g. `cron`, `nested`, `subagent`) so background jobs can run in parallel without blocking inbound replies. Isolated cron agent turns hold a `cron` slot while their inner agent execution uses `nested`; both use `cron.maxConcurrentRuns`. These detached runs are tracked as [background tasks](/automation/tasks). - Per-session lanes guarantee that only one agent run touches a given session at a time. - No external dependencies or background worker threads; pure TypeScript + promises. diff --git a/src/gateway/server-lanes.test.ts b/src/gateway/server-lanes.test.ts new file mode 100644 index 00000000000..ef8ae324c9f --- /dev/null +++ b/src/gateway/server-lanes.test.ts @@ -0,0 +1,60 @@ +import { afterEach, describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { enqueueCommandInLane, resetCommandQueueStateForTest } from "../process/command-queue.js"; +import { CommandLane } from "../process/lanes.js"; +import { applyGatewayLaneConcurrency } from "./server-lanes.js"; + +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe("applyGatewayLaneConcurrency", () => { + afterEach(() => { + resetCommandQueueStateForTest(); + }); + + it("applies cron maxConcurrentRuns to the nested lane used by cron agent turns", async () => { + applyGatewayLaneConcurrency({ cron: { maxConcurrentRuns: 2 } } as OpenClawConfig); + + let activeRuns = 0; + let peakActiveRuns = 0; + const bothRunsStarted = createDeferred(); + const releaseRuns = createDeferred(); + + const run = async () => { + activeRuns += 1; + peakActiveRuns = Math.max(peakActiveRuns, activeRuns); + if (peakActiveRuns >= 2) { + bothRunsStarted.resolve(); + } + try { + await releaseRuns.promise; + } finally { + activeRuns -= 1; + } + }; + + const first = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 }); + const second = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 }); + const timeout = setTimeout(() => { + bothRunsStarted.reject( + new Error("timed out waiting for nested cron work to run in parallel"), + ); + }, 250); + + try { + await bothRunsStarted.promise; + expect(peakActiveRuns).toBe(2); + } finally { + clearTimeout(timeout); + releaseRuns.resolve(); + await Promise.all([first, second]); + } + }); +}); diff --git a/src/gateway/server-lanes.ts b/src/gateway/server-lanes.ts index 50232919837..7a366884474 100644 --- a/src/gateway/server-lanes.ts +++ b/src/gateway/server-lanes.ts @@ -4,7 +4,10 @@ import { setCommandLaneConcurrency } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; export function applyGatewayLaneConcurrency(cfg: OpenClawConfig) { - setCommandLaneConcurrency(CommandLane.Cron, cfg.cron?.maxConcurrentRuns ?? 1); + const cronMaxConcurrentRuns = cfg.cron?.maxConcurrentRuns ?? 1; + setCommandLaneConcurrency(CommandLane.Cron, cronMaxConcurrentRuns); + // Cron isolated agent turns remap their inner LLM work to the nested lane. + setCommandLaneConcurrency(CommandLane.Nested, cronMaxConcurrentRuns); setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg)); setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg)); } diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 59f8304967f..2468b0167e7 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -3,7 +3,6 @@ import { disposeAllSessionMcpRuntimes } from "../agents/pi-bundle-mcp-tools.js"; import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js"; import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; import type { CliDeps } from "../cli/deps.types.js"; -import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; import { isRestartEnabled } from "../config/commands.flags.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { startGmailWatcherWithLogs } from "../hooks/gmail-watcher-lifecycle.js"; @@ -16,8 +15,7 @@ import { emitGatewayRestart, setGatewaySigusr1RestartPolicy, } from "../infra/restart.js"; -import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js"; -import { CommandLane } from "../process/lanes.js"; +import { getTotalQueueSize } from "../process/command-queue.js"; import { activateSecretsRuntimeSnapshot, clearSecretsRuntimeSnapshot, @@ -30,6 +28,7 @@ import type { ChannelKind } from "./config-reload-plan.js"; import { startGatewayConfigReloader, type GatewayReloadPlan } from "./config-reload.js"; import { resolveHooksConfig } from "./hooks.js"; import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js"; +import { applyGatewayLaneConcurrency } from "./server-lanes.js"; import { type GatewayChannelManager, startGatewayChannelHealthMonitor, @@ -302,9 +301,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) } } - setCommandLaneConcurrency(CommandLane.Cron, nextConfig.cron?.maxConcurrentRuns ?? 1); - setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(nextConfig)); - setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(nextConfig)); + applyGatewayLaneConcurrency(nextConfig); if (plan.hotReasons.length > 0) { params.logReload.info(`config hot reload applied (${plan.hotReasons.join(", ")})`);