mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-11 09:11:13 +00:00
fix(heartbeat): skip busy session lane wake delivery (#40526)
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -134,6 +134,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Providers/Google: add model-level `cacheRetention` support for direct Gemini system prompts by creating, reusing, and refreshing `cachedContents` automatically on Google AI Studio runs. (#51372) Thanks @rafaelmariano-glitch.
|
||||
- Windows/restart: fall back to the installed Startup-entry launcher when the scheduled task was never registered, so `/restart` can relaunch the gateway on Windows setups where `schtasks` install fell back during onboarding. (#58943) Thanks @imechZhangLY.
|
||||
- Exec/heartbeat: use the canonical `exec-event` wake reason for `notifyOnExit` so background exec completions still trigger follow-up turns when `HEARTBEAT.md` is empty or comments-only. (#41479) Thanks @rstar327.
|
||||
- Heartbeat: skip wake delivery when the target session lane is already busy so the pending event is retried instead of getting drained too early. (#40526) Thanks @lucky7323.
|
||||
|
||||
## 2026.4.2
|
||||
|
||||
|
||||
130
src/infra/heartbeat-runner.skips-busy-session-lane.test.ts
Normal file
130
src/infra/heartbeat-runner.skips-busy-session-lane.test.ts
Normal file
@@ -0,0 +1,130 @@
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import * as replyModule from "../auto-reply/reply.js";
|
||||
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
|
||||
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
|
||||
import { type HeartbeatDeps, runHeartbeatOnce } from "./heartbeat-runner.js";
|
||||
import { resetSystemEventsForTest, enqueueSystemEvent } from "./system-events.js";
|
||||
import {
|
||||
seedMainSessionStore,
|
||||
withTempHeartbeatSandbox,
|
||||
} from "./heartbeat-runner.test-utils.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
|
||||
vi.mock("jiti", () => ({ createJiti: () => () => ({}) }));
|
||||
|
||||
let previousRegistry: ReturnType<typeof getActivePluginRegistry> | null = null;
|
||||
|
||||
const noopOutbound = {
|
||||
deliveryMode: "direct" as const,
|
||||
sendText: async () => ({ channel: "telegram" as const, messageId: "1", chatId: "1" }),
|
||||
sendMedia: async () => ({ channel: "telegram" as const, messageId: "1", chatId: "1" }),
|
||||
};
|
||||
|
||||
beforeAll(() => {
|
||||
previousRegistry = getActivePluginRegistry();
|
||||
const telegramPlugin = createOutboundTestPlugin({ id: "telegram", outbound: noopOutbound });
|
||||
const registry = createTestRegistry([
|
||||
{ pluginId: "telegram", plugin: telegramPlugin, source: "test" },
|
||||
]);
|
||||
setActivePluginRegistry(registry);
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
if (previousRegistry) setActivePluginRegistry(previousRegistry);
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
resetSystemEventsForTest();
|
||||
});
|
||||
|
||||
describe("heartbeat runner skips when target session lane is busy", () => {
|
||||
it("returns requests-in-flight when session lane has queued work", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg: OpenClawConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "30m" },
|
||||
model: { primary: "test/model" },
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
telegram: {
|
||||
enabled: true,
|
||||
token: "fake",
|
||||
allowFrom: ["123"],
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig;
|
||||
|
||||
const sessionKey = await seedMainSessionStore(storePath, cfg, {
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123",
|
||||
});
|
||||
|
||||
enqueueSystemEvent("Exec completed (test-id, code 0) :: test output", {
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
// main lane idle (0), session lane busy (1)
|
||||
const getQueueSize = vi.fn((lane?: string) => {
|
||||
if (!lane || lane === "main") return 0;
|
||||
if (lane.startsWith("session:")) return 1;
|
||||
return 0;
|
||||
});
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: { getQueueSize, nowMs: () => Date.now() } as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(result.status).toBe("skipped");
|
||||
if (result.status === "skipped") {
|
||||
expect(result.reason).toBe("requests-in-flight");
|
||||
}
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("proceeds normally when session lane is idle", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
|
||||
const cfg: OpenClawConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
heartbeat: { every: "30m" },
|
||||
model: { primary: "test/model" },
|
||||
},
|
||||
},
|
||||
channels: {
|
||||
telegram: {
|
||||
enabled: true,
|
||||
token: "fake",
|
||||
allowFrom: ["123"],
|
||||
},
|
||||
},
|
||||
} as unknown as OpenClawConfig;
|
||||
|
||||
await seedMainSessionStore(storePath, cfg, {
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123",
|
||||
});
|
||||
|
||||
// Both lanes idle
|
||||
const getQueueSize = vi.fn((_lane?: string) => 0);
|
||||
|
||||
replySpy.mockResolvedValue({
|
||||
text: "HEARTBEAT_OK",
|
||||
model: "test/model",
|
||||
} as any);
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
deps: { getQueueSize, nowMs: () => Date.now() } as HeartbeatDeps,
|
||||
});
|
||||
|
||||
expect(replySpy).toHaveBeenCalled();
|
||||
expect(result.status).toBe("ran");
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -48,6 +48,7 @@ import {
|
||||
} from "../routing/session-key.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { escapeRegExp } from "../utils.js";
|
||||
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner.js";
|
||||
import { formatErrorMessage, hasErrnoCode } from "./errors.js";
|
||||
import { isWithinActiveHours } from "./heartbeat-active-hours.js";
|
||||
import {
|
||||
@@ -634,6 +635,21 @@ export async function runHeartbeatOnce(opts: {
|
||||
return { status: "skipped", reason: preflight.skipReason };
|
||||
}
|
||||
const { entry, sessionKey, storePath } = preflight.session;
|
||||
|
||||
// Check the resolved session lane — if it is busy, skip to avoid interrupting
|
||||
// an active streaming turn. The wake-layer retry (heartbeat-wake.ts) will
|
||||
// re-schedule this wake automatically. See #14396 (closed without merge).
|
||||
const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey);
|
||||
const sessionLaneSize = (opts.deps?.getQueueSize ?? getQueueSize)(sessionLaneKey);
|
||||
if (sessionLaneSize > 0) {
|
||||
emitHeartbeatEvent({
|
||||
status: "skipped",
|
||||
reason: "requests-in-flight",
|
||||
durationMs: Date.now() - startedAt,
|
||||
});
|
||||
return { status: "skipped", reason: "requests-in-flight" };
|
||||
}
|
||||
|
||||
const previousUpdatedAt = entry?.updatedAt;
|
||||
|
||||
// When isolatedSession is enabled, create a fresh session via the same
|
||||
|
||||
Reference in New Issue
Block a user