mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 17:40:44 +00:00
fix: isolate cron nested lane concurrency
This commit is contained in:
@@ -61,7 +61,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Google Meet: add `realtime.agentId` so live meeting consults can target a named OpenClaw agent instead of always using `main`. (#72381) Thanks @BsnizND.
|
||||
- Google Meet: route stateful `google_meet` tool actions through the gateway-owned runtime so created or joined realtime sessions remain visible to status, speak, and leave after the agent turn ends. Fixes #72440. (#72441) Thanks @BsnizND.
|
||||
- Matrix/E2EE: stabilize recovery and broken-device QA flows while avoiding Matrix device-cleanup sync races that could leave shutdown-time crypto work running. Thanks @gumadeiras.
|
||||
- Cron: apply `cron.maxConcurrentRuns` to isolated agent-turn execution as well as cron dispatch, so parallel cron jobs no longer serialize on the inner nested LLM lane. Fixes #72707. Thanks @kagura-agent.
|
||||
- Cron: apply `cron.maxConcurrentRuns` to a dedicated `cron-nested` isolated agent-turn lane as well as cron dispatch, so parallel cron jobs no longer serialize on inner LLM execution while non-cron nested flows keep their existing lane behavior. Fixes #72707. Thanks @kagura-agent.
|
||||
- Cron: treat isolated run-level agent failures as job errors even when no reply payload is produced, synthesizing a safe error payload so model/provider failures increment error counters and trigger failure notifications instead of clearing as successful. Fixes #43604; carries forward #43631. Thanks @SPFAdvisors.
|
||||
- Cron: preserve exact `NO_REPLY` tool results from isolated jobs with empty final assistant turns as quiet successes instead of surfacing incomplete-turn errors. Fixes #68452; carries forward #68453. Thanks @anyech.
|
||||
- Cron: resolve failure alerts and failure-destination announcements against `session:<id>` targets before falling back to the creator session, so jobs created from group chats can notify the targeted direct session without cross-account routing errors. Refs #62777; carries forward #68535. Thanks @slideshow-dingo and @likewen-tech.
|
||||
|
||||
@@ -398,7 +398,7 @@ Model override note:
|
||||
}
|
||||
```
|
||||
|
||||
`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's `nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers.
|
||||
`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's dedicated `cron-nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. The shared non-cron `nested` lane is not widened by this setting.
|
||||
|
||||
The runtime state sidecar is derived from `cron.store`: a `.json` store such as `~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path without a `.json` suffix appends `-state.json`.
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`.
|
||||
|
||||
- Applies to auto-reply agent runs across all inbound channels that use the gateway reply pipeline (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, webchat, etc.).
|
||||
- Default lane (`main`) is process-wide for inbound + main heartbeats; set `agents.defaults.maxConcurrent` to allow multiple sessions in parallel.
|
||||
- Additional lanes may exist (e.g. `cron`, `nested`, `subagent`) so background jobs can run in parallel without blocking inbound replies. Isolated cron agent turns hold a `cron` slot while their inner agent execution uses `nested`; both use `cron.maxConcurrentRuns`. These detached runs are tracked as [background tasks](/automation/tasks).
|
||||
- Additional lanes may exist (e.g. `cron`, `cron-nested`, `nested`, `subagent`) so background jobs can run in parallel without blocking inbound replies. Isolated cron agent turns hold a `cron` slot while their inner agent execution uses `cron-nested`; both use `cron.maxConcurrentRuns`. Shared non-cron `nested` flows keep their own lane behavior. These detached runs are tracked as [background tasks](/automation/tasks).
|
||||
- Per-session lanes guarantee that only one agent run touches a given session at a time.
|
||||
- No external dependencies or background worker threads; pure TypeScript + promises.
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import {
|
||||
AGENT_LANE_CRON_NESTED,
|
||||
AGENT_LANE_NESTED,
|
||||
isNestedAgentLane,
|
||||
resolveCronAgentLane,
|
||||
resolveNestedAgentLane,
|
||||
resolveNestedAgentLaneForSession,
|
||||
} from "./lanes.js";
|
||||
@@ -11,14 +14,27 @@ describe("resolveNestedAgentLane", () => {
|
||||
expect(resolveNestedAgentLane()).toBe(AGENT_LANE_NESTED);
|
||||
});
|
||||
|
||||
it("moves cron lane callers onto the nested lane", () => {
|
||||
expect(resolveNestedAgentLane("cron")).toBe(AGENT_LANE_NESTED);
|
||||
expect(resolveNestedAgentLane(" cron ")).toBe(AGENT_LANE_NESTED);
|
||||
it("preserves explicit lanes", () => {
|
||||
expect(resolveNestedAgentLane("cron")).toBe(CommandLane.Cron);
|
||||
expect(resolveNestedAgentLane(" cron ")).toBe(CommandLane.Cron);
|
||||
expect(resolveNestedAgentLane("subagent")).toBe("subagent");
|
||||
expect(resolveNestedAgentLane(" custom-lane ")).toBe("custom-lane");
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveCronAgentLane", () => {
|
||||
it("defaults cron-owned runs to the cron-nested lane", () => {
|
||||
expect(resolveCronAgentLane()).toBe(AGENT_LANE_CRON_NESTED);
|
||||
});
|
||||
|
||||
it("moves cron lane callers onto the cron-nested lane", () => {
|
||||
expect(resolveCronAgentLane("cron")).toBe(AGENT_LANE_CRON_NESTED);
|
||||
expect(resolveCronAgentLane(" cron ")).toBe(AGENT_LANE_CRON_NESTED);
|
||||
});
|
||||
|
||||
it("preserves non-cron lanes", () => {
|
||||
expect(resolveNestedAgentLane("subagent")).toBe("subagent");
|
||||
expect(resolveNestedAgentLane(" custom-lane ")).toBe("custom-lane");
|
||||
expect(resolveCronAgentLane("subagent")).toBe("subagent");
|
||||
expect(resolveCronAgentLane(" custom-lane ")).toBe("custom-lane");
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,20 +1,29 @@
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
|
||||
export const AGENT_LANE_NESTED = CommandLane.Nested;
|
||||
export const AGENT_LANE_CRON_NESTED = CommandLane.CronNested;
|
||||
export const AGENT_LANE_SUBAGENT = CommandLane.Subagent;
|
||||
const NESTED_LANE = "nested";
|
||||
const NESTED_LANE_PREFIX = `${NESTED_LANE}:`;
|
||||
|
||||
export function resolveNestedAgentLane(lane?: string): string {
|
||||
const trimmed = lane?.trim();
|
||||
// Nested agent runs should not inherit the cron execution lane. Cron jobs
|
||||
// already occupy that lane while they dispatch inner work.
|
||||
if (!trimmed || trimmed === "cron") {
|
||||
if (!trimmed) {
|
||||
return AGENT_LANE_NESTED;
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
export function resolveCronAgentLane(lane?: string): string {
|
||||
const trimmed = lane?.trim();
|
||||
// Cron jobs already occupy the outer cron lane, so inner agent work needs
|
||||
// its own lane to avoid self-deadlock without widening shared nested flows.
|
||||
if (!trimmed || trimmed === CommandLane.Cron) {
|
||||
return AGENT_LANE_CRON_NESTED;
|
||||
}
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
export function resolveNestedAgentLaneForSession(sessionKey: string | undefined): string {
|
||||
const trimmed = sessionKey?.trim();
|
||||
if (!trimmed) {
|
||||
|
||||
@@ -10,12 +10,12 @@ describe("resolveGlobalLane", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("maps cron lane to nested lane to prevent deadlocks", () => {
|
||||
it("maps cron lane to cron-nested lane to prevent deadlocks", () => {
|
||||
// When cron jobs trigger nested agent runs, the outer execution holds
|
||||
// the cron lane slot. Inner work must use a separate lane to avoid
|
||||
// deadlock. See: https://github.com/openclaw/openclaw/issues/44805
|
||||
for (const lane of ["cron", " cron "]) {
|
||||
expect(resolveGlobalLane(lane)).toBe(CommandLane.Nested);
|
||||
expect(resolveGlobalLane(lane)).toBe(CommandLane.CronNested);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -23,6 +23,7 @@ describe("resolveGlobalLane", () => {
|
||||
for (const [lane, expected] of [
|
||||
["main", CommandLane.Main],
|
||||
["subagent", CommandLane.Subagent],
|
||||
["cron-nested", CommandLane.CronNested],
|
||||
["nested", CommandLane.Nested],
|
||||
["custom-lane", "custom-lane"],
|
||||
[" custom ", "custom"],
|
||||
|
||||
@@ -7,9 +7,10 @@ export function resolveSessionLane(key: string) {
|
||||
|
||||
export function resolveGlobalLane(lane?: string) {
|
||||
const cleaned = lane?.trim();
|
||||
// Cron jobs hold the cron lane slot; inner operations must use nested to avoid deadlock.
|
||||
// Cron jobs hold the cron lane slot; inner operations need a dedicated lane
|
||||
// to avoid deadlock without widening shared nested flows.
|
||||
if (cleaned === CommandLane.Cron) {
|
||||
return CommandLane.Nested;
|
||||
return CommandLane.CronNested;
|
||||
}
|
||||
return cleaned ? cleaned : CommandLane.Main;
|
||||
}
|
||||
|
||||
@@ -82,15 +82,15 @@ describe("runCronIsolatedAgentTurn lane selection", () => {
|
||||
vi.resetModules();
|
||||
});
|
||||
|
||||
it("moves the cron lane to nested for embedded runs", async () => {
|
||||
it("moves the cron lane to cron-nested for embedded runs", async () => {
|
||||
await withTempCronHome(async (home) => {
|
||||
expect(await runLaneCase(home, "cron")).toBe("nested");
|
||||
expect(await runLaneCase(home, "cron")).toBe("cron-nested");
|
||||
});
|
||||
});
|
||||
|
||||
it("defaults missing lanes to nested for embedded runs", async () => {
|
||||
it("defaults missing lanes to cron-nested for embedded runs", async () => {
|
||||
await withTempCronHome(async (home) => {
|
||||
expect(await runLaneCase(home)).toBe("nested");
|
||||
expect(await runLaneCase(home)).toBe("cron-nested");
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
export { resolveFastModeState } from "../../agents/fast-mode.js";
|
||||
export { resolveNestedAgentLane } from "../../agents/lanes.js";
|
||||
export { resolveCronAgentLane } from "../../agents/lanes.js";
|
||||
export { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
||||
|
||||
@@ -156,7 +156,7 @@ export function createCronPromptExecutor(params: {
|
||||
);
|
||||
return result;
|
||||
}
|
||||
const { resolveFastModeState, resolveNestedAgentLane, runEmbeddedPiAgent } =
|
||||
const { resolveCronAgentLane, resolveFastModeState, runEmbeddedPiAgent } =
|
||||
await loadCronEmbeddedRuntime();
|
||||
const currentChannelId = await resolveCurrentChannelTarget({
|
||||
channel: params.messageChannel,
|
||||
@@ -183,7 +183,7 @@ export function createCronPromptExecutor(params: {
|
||||
config: params.cfgWithAgentDefaults,
|
||||
skillsSnapshot: params.skillsSnapshot,
|
||||
prompt: promptText,
|
||||
lane: resolveNestedAgentLane(params.lane),
|
||||
lane: resolveCronAgentLane(params.lane),
|
||||
provider: providerOverride,
|
||||
model: modelOverride,
|
||||
authProfileId: params.liveSelection.authProfileId,
|
||||
|
||||
@@ -73,7 +73,7 @@ export const retireSessionMcpRuntimeMock = createMock();
|
||||
|
||||
const resolveBootstrapWarningSignaturesSeenMock = createMock();
|
||||
const resolveCronStyleNowMock = createMock();
|
||||
const resolveNestedAgentLaneMock = createMock();
|
||||
const resolveCronAgentLaneMock = createMock();
|
||||
const resolveAgentTimeoutMsMock = createMock();
|
||||
const deriveSessionTotalTokensMock = createMock();
|
||||
const hasNonzeroUsageMock = createMock();
|
||||
@@ -165,7 +165,7 @@ vi.mock("./run-execution.runtime.js", () => ({
|
||||
getCliSessionId: getCliSessionIdMock,
|
||||
runCliAgent: runCliAgentMock,
|
||||
resolveFastModeState: resolveFastModeStateMock,
|
||||
resolveNestedAgentLane: resolveNestedAgentLaneMock,
|
||||
resolveCronAgentLane: resolveCronAgentLaneMock,
|
||||
LiveSessionModelSwitchError,
|
||||
runWithModelFallback: runWithModelFallbackMock,
|
||||
isCliProvider: isCliProviderMock,
|
||||
@@ -184,7 +184,7 @@ vi.mock("./run-auth-profile.runtime.js", () => ({
|
||||
|
||||
vi.mock("./run-embedded.runtime.js", () => ({
|
||||
resolveFastModeState: resolveFastModeStateMock,
|
||||
resolveNestedAgentLane: resolveNestedAgentLaneMock,
|
||||
resolveCronAgentLane: resolveCronAgentLaneMock,
|
||||
runEmbeddedPiAgent: runEmbeddedPiAgentMock,
|
||||
}));
|
||||
|
||||
@@ -339,7 +339,7 @@ function resetRunExecutionMocks(): void {
|
||||
isCliProviderMock.mockReturnValue(false);
|
||||
resolveBootstrapWarningSignaturesSeenMock.mockReturnValue(new Set());
|
||||
resolveFastModeStateMock.mockImplementation((params) => resolveFastModeStateImpl(params));
|
||||
resolveNestedAgentLaneMock.mockReturnValue(undefined);
|
||||
resolveCronAgentLaneMock.mockReturnValue(undefined);
|
||||
normalizeVerboseLevelMock.mockImplementation((value: unknown) => value ?? "off");
|
||||
resolveSessionTranscriptPathMock.mockReturnValue("/tmp/transcript.jsonl");
|
||||
registerAgentRunContextMock.mockReturnValue(undefined);
|
||||
|
||||
@@ -19,7 +19,7 @@ describe("applyGatewayLaneConcurrency", () => {
|
||||
resetCommandQueueStateForTest();
|
||||
});
|
||||
|
||||
it("applies cron maxConcurrentRuns to the nested lane used by cron agent turns", async () => {
|
||||
it("applies cron maxConcurrentRuns to the cron-nested lane used by cron agent turns", async () => {
|
||||
applyGatewayLaneConcurrency({ cron: { maxConcurrentRuns: 2 } } as OpenClawConfig);
|
||||
|
||||
let activeRuns = 0;
|
||||
@@ -40,8 +40,8 @@ describe("applyGatewayLaneConcurrency", () => {
|
||||
}
|
||||
};
|
||||
|
||||
const first = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 });
|
||||
const second = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 });
|
||||
const first = enqueueCommandInLane(CommandLane.CronNested, run, { warnAfterMs: 10_000 });
|
||||
const second = enqueueCommandInLane(CommandLane.CronNested, run, { warnAfterMs: 10_000 });
|
||||
const timeout = setTimeout(() => {
|
||||
bothRunsStarted.reject(
|
||||
new Error("timed out waiting for nested cron work to run in parallel"),
|
||||
@@ -57,4 +57,24 @@ describe("applyGatewayLaneConcurrency", () => {
|
||||
await Promise.all([first, second]);
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps the shared nested lane at its default concurrency", async () => {
|
||||
applyGatewayLaneConcurrency({ cron: { maxConcurrentRuns: 2 } } as OpenClawConfig);
|
||||
|
||||
let startedRuns = 0;
|
||||
const releaseRuns = createDeferred<void>();
|
||||
const run = async () => {
|
||||
startedRuns += 1;
|
||||
await releaseRuns.promise;
|
||||
};
|
||||
|
||||
const first = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 });
|
||||
const second = enqueueCommandInLane(CommandLane.Nested, run, { warnAfterMs: 10_000 });
|
||||
await Promise.resolve();
|
||||
|
||||
expect(startedRuns).toBe(1);
|
||||
|
||||
releaseRuns.resolve();
|
||||
await Promise.all([first, second]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -6,8 +6,8 @@ import { CommandLane } from "../process/lanes.js";
|
||||
export function applyGatewayLaneConcurrency(cfg: OpenClawConfig) {
|
||||
const cronMaxConcurrentRuns = cfg.cron?.maxConcurrentRuns ?? 1;
|
||||
setCommandLaneConcurrency(CommandLane.Cron, cronMaxConcurrentRuns);
|
||||
// Cron isolated agent turns remap their inner LLM work to the nested lane.
|
||||
setCommandLaneConcurrency(CommandLane.Nested, cronMaxConcurrentRuns);
|
||||
// Cron isolated agent turns remap inner LLM work to this lane.
|
||||
setCommandLaneConcurrency(CommandLane.CronNested, cronMaxConcurrentRuns);
|
||||
setCommandLaneConcurrency(CommandLane.Main, resolveAgentMaxConcurrent(cfg));
|
||||
setCommandLaneConcurrency(CommandLane.Subagent, resolveSubagentMaxConcurrent(cfg));
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
export const enum CommandLane {
|
||||
Main = "main",
|
||||
Cron = "cron",
|
||||
CronNested = "cron-nested",
|
||||
Subagent = "subagent",
|
||||
Nested = "nested",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user