mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:10:44 +00:00
fix: apply cron concurrency to nested lane
This commit is contained in:
@@ -58,6 +58,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Google Meet: add `realtime.agentId` so live meeting consults can target a named OpenClaw agent instead of always using `main`. (#72381) Thanks @BsnizND.
|
||||
- Google Meet: route stateful `google_meet` tool actions through the gateway-owned runtime so created or joined realtime sessions remain visible to status, speak, and leave after the agent turn ends. Fixes #72440. (#72441) Thanks @BsnizND.
|
||||
- Matrix/E2EE: stabilize recovery and broken-device QA flows while avoiding Matrix device-cleanup sync races that could leave shutdown-time crypto work running. Thanks @gumadeiras.
|
||||
- Cron: apply `cron.maxConcurrentRuns` to isolated agent-turn execution as well as cron dispatch, so parallel cron jobs no longer serialize on the inner nested LLM lane. Fixes #72707. Thanks @kagura-agent.
|
||||
- Cron: treat isolated run-level agent failures as job errors even when no reply payload is produced, synthesizing a safe error payload so model/provider failures increment error counters and trigger failure notifications instead of clearing as successful. Fixes #43604; carries forward #43631. Thanks @SPFAdvisors.
|
||||
- Cron: preserve exact `NO_REPLY` tool results from isolated jobs with empty final assistant turns as quiet successes instead of surfacing incomplete-turn errors. Fixes #68452; carries forward #68453. Thanks @anyech.
|
||||
- Cron: resolve failure alerts and failure-destination announcements against `session:<id>` targets before falling back to the creator session, so jobs created from group chats can notify the targeted direct session without cross-account routing errors. Refs #62777; carries forward #68535. Thanks @slideshow-dingo and @likewen-tech.
|
||||
|
||||
@@ -398,6 +398,8 @@ Model override note:
|
||||
}
|
||||
```
|
||||
|
||||
`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's `nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers.
|
||||
|
||||
The runtime state sidecar is derived from `cron.store`: a `.json` store such as `~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path without a `.json` suffix appends `-state.json`.
|
||||
|
||||
If you hand-edit `jobs.json`, leave `jobs-state.json` out of source control. OpenClaw uses that sidecar for pending slots, active markers, last-run metadata, and the schedule identity that tells the scheduler when an externally edited job needs a fresh `nextRunAtMs`.
|
||||
|
||||
@@ -77,7 +77,7 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`.
|
||||
|
||||
- Applies to auto-reply agent runs across all inbound channels that use the gateway reply pipeline (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, webchat, etc.).
|
||||
- Default lane (`main`) is process-wide for inbound + main heartbeats; set `agents.defaults.maxConcurrent` to allow multiple sessions in parallel.
|
||||
- Additional lanes may exist (e.g. `cron`, `subagent`) so background jobs can run in parallel without blocking inbound replies. These detached runs are tracked as [background tasks](/automation/tasks).
|
||||
- Additional lanes may exist (e.g. `cron`, `nested`, `subagent`) so background jobs can run in parallel without blocking inbound replies. Isolated cron agent turns hold a `cron` slot while their inner agent execution uses `nested`; both use `cron.maxConcurrentRuns`. These detached runs are tracked as [background tasks](/automation/tasks).
|
||||
- Per-session lanes guarantee that only one agent run touches a given session at a time.
|
||||
- No external dependencies or background worker threads; pure TypeScript + promises.
|
||||
|
||||
|
||||
60
src/gateway/server-lanes.test.ts
Normal file
60
src/gateway/server-lanes.test.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { enqueueCommandInLane, resetCommandQueueStateForTest } from "../process/command-queue.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
describe("applyGatewayLaneConcurrency", () => {
|
||||
afterEach(() => {
|
||||
resetCommandQueueStateForTest();
|
||||
});
|
||||
|
||||
it("applies cron maxConcurrentRuns to the nested lane used by cron agent turns", async () => {
|
||||
applyGatewayLaneConcurrency({ cron: { maxConcurrentRuns: 2 } } as OpenClawConfig);
|
||||
|
||||
let activeRuns = 0;
|
||||
let peakActiveRuns = 0;
|
||||
const bothRunsStarted = createDeferred<void>();
|
||||
const releaseRuns = createDeferred<void>();
|
||||
|
||||
const run = async () => {
|
||||
activeRuns += 1;
|
||||
peakActiveRuns = Math.max(peakActiveRuns, activeRuns);
|
||||
if (peakActiveRuns >= 2) {
|
||||
bothRunsStarted.resolve();
|
||||
}
|
||||
try {
|
||||
await releaseRuns.promise;
|
||||
} finally {
|
||||
activeRuns -= 1;
|
||||
}
|
||||
};
|
||||
|
||||
const first = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 });
|
||||
const second = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 });
|
||||
const timeout = setTimeout(() => {
|
||||
bothRunsStarted.reject(
|
||||
new Error("timed out waiting for nested cron work to run in parallel"),
|
||||
);
|
||||
}, 250);
|
||||
|
||||
try {
|
||||
await bothRunsStarted.promise;
|
||||
expect(peakActiveRuns).toBe(2);
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
releaseRuns.resolve();
|
||||
await Promise.all([first, second]);
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -4,7 +4,10 @@ import { setCommandLaneConcurrency } from "../process/command-queue.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
|
||||
export function applyGatewayLaneConcurrency(cfg: OpenClawConfig) {
|
||||
setCommandLaneConcurrency(CommandLane.Cron, cfg.cron?.maxConcurrentRuns ?? 1);
|
||||
const cronMaxConcurrentRuns = cfg.cron?.maxConcurrentRuns ?? 1;
|
||||
setCommandLaneConcurrency(CommandLane.Cron, cronMaxConcurrentRuns);
|
||||
// Cron isolated agent turns remap their inner LLM work to the nested lane.
|
||||
setCommandLaneConcurrency(CommandLane.Nested, cronMaxConcurrentRuns);
|
||||
setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg));
|
||||
setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg));
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ import { disposeAllSessionMcpRuntimes } from "../agents/pi-bundle-mcp-tools.js";
|
||||
import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-state.js";
|
||||
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js";
|
||||
import { isRestartEnabled } from "../config/commands.flags.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { startGmailWatcherWithLogs } from "../hooks/gmail-watcher-lifecycle.js";
|
||||
@@ -16,8 +15,7 @@ import {
|
||||
emitGatewayRestart,
|
||||
setGatewaySigusr1RestartPolicy,
|
||||
} from "../infra/restart.js";
|
||||
import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import { getTotalQueueSize } from "../process/command-queue.js";
|
||||
import {
|
||||
activateSecretsRuntimeSnapshot,
|
||||
clearSecretsRuntimeSnapshot,
|
||||
@@ -30,6 +28,7 @@ import type { ChannelKind } from "./config-reload-plan.js";
|
||||
import { startGatewayConfigReloader, type GatewayReloadPlan } from "./config-reload.js";
|
||||
import { resolveHooksConfig } from "./hooks.js";
|
||||
import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js";
|
||||
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
|
||||
import {
|
||||
type GatewayChannelManager,
|
||||
startGatewayChannelHealthMonitor,
|
||||
@@ -302,9 +301,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
|
||||
}
|
||||
}
|
||||
|
||||
setCommandLaneConcurrency(CommandLane.Cron, nextConfig.cron?.maxConcurrentRuns ?? 1);
|
||||
setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(nextConfig));
|
||||
setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(nextConfig));
|
||||
applyGatewayLaneConcurrency(nextConfig);
|
||||
|
||||
if (plan.hotReasons.length > 0) {
|
||||
params.logReload.info(`config hot reload applied (${plan.hotReasons.join(", ")})`);
|
||||
|
||||
Reference in New Issue
Block a user