diff --git a/CHANGELOG.md b/CHANGELOG.md index cf33ae7d512..fe3b27bf2d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -134,6 +134,7 @@ Docs: https://docs.openclaw.ai - Providers/Google: add model-level `cacheRetention` support for direct Gemini system prompts by creating, reusing, and refreshing `cachedContents` automatically on Google AI Studio runs. (#51372) Thanks @rafaelmariano-glitch. - Windows/restart: fall back to the installed Startup-entry launcher when the scheduled task was never registered, so `/restart` can relaunch the gateway on Windows setups where `schtasks` install fell back during onboarding. (#58943) Thanks @imechZhangLY. - Exec/heartbeat: use the canonical `exec-event` wake reason for `notifyOnExit` so background exec completions still trigger follow-up turns when `HEARTBEAT.md` is empty or comments-only. (#41479) Thanks @rstar327. +- Heartbeat: skip wake delivery when the target session lane is already busy so the pending event is retried instead of getting drained too early. (#40526) Thanks @lucky7323. ## 2026.4.2 diff --git a/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts b/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts new file mode 100644 index 00000000000..1feed8e530b --- /dev/null +++ b/src/infra/heartbeat-runner.skips-busy-session-lane.test.ts @@ -0,0 +1,130 @@ +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import * as replyModule from "../auto-reply/reply.js"; +import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js"; +import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js"; +import { type HeartbeatDeps, runHeartbeatOnce } from "./heartbeat-runner.js"; +import { resetSystemEventsForTest, enqueueSystemEvent } from "./system-events.js"; +import { + seedMainSessionStore, + withTempHeartbeatSandbox, +} from "./heartbeat-runner.test-utils.js"; +import type { OpenClawConfig } from "../config/config.js"; + +vi.mock("jiti", () => ({ createJiti: () => () => ({}) })); + +let previousRegistry: ReturnType | null = null; + +const noopOutbound = { + deliveryMode: "direct" as const, + sendText: async () => ({ channel: "telegram" as const, messageId: "1", chatId: "1" }), + sendMedia: async () => ({ channel: "telegram" as const, messageId: "1", chatId: "1" }), +}; + +beforeAll(() => { + previousRegistry = getActivePluginRegistry(); + const telegramPlugin = createOutboundTestPlugin({ id: "telegram", outbound: noopOutbound }); + const registry = createTestRegistry([ + { pluginId: "telegram", plugin: telegramPlugin, source: "test" }, + ]); + setActivePluginRegistry(registry); +}); + +afterAll(() => { + if (previousRegistry) setActivePluginRegistry(previousRegistry); +}); + +beforeEach(() => { + resetSystemEventsForTest(); +}); + +describe("heartbeat runner skips when target session lane is busy", () => { + it("returns requests-in-flight when session lane has queued work", async () => { + await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + heartbeat: { every: "30m" }, + model: { primary: "test/model" }, + }, + }, + channels: { + telegram: { + enabled: true, + token: "fake", + allowFrom: ["123"], + }, + }, + } as unknown as OpenClawConfig; + + const sessionKey = await seedMainSessionStore(storePath, cfg, { + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123", + }); + + enqueueSystemEvent("Exec completed (test-id, code 0) :: test output", { + sessionKey, + }); + + // main lane idle (0), session lane busy (1) + const getQueueSize = vi.fn((lane?: string) => { + if (!lane || lane === "main") return 0; + if (lane.startsWith("session:")) return 1; + return 0; + }); + + const result = await runHeartbeatOnce({ + cfg, + deps: { getQueueSize, nowMs: () => Date.now() } as HeartbeatDeps, + }); + + expect(result.status).toBe("skipped"); + if (result.status === "skipped") { + expect(result.reason).toBe("requests-in-flight"); + } + expect(replySpy).not.toHaveBeenCalled(); + }); + }); + + it("proceeds normally when session lane is idle", async () => { + await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + heartbeat: { every: "30m" }, + model: { primary: "test/model" }, + }, + }, + channels: { + telegram: { + enabled: true, + token: "fake", + allowFrom: ["123"], + }, + }, + } as unknown as OpenClawConfig; + + await seedMainSessionStore(storePath, cfg, { + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123", + }); + + // Both lanes idle + const getQueueSize = vi.fn((_lane?: string) => 0); + + replySpy.mockResolvedValue({ + text: "HEARTBEAT_OK", + model: "test/model", + } as any); + + const result = await runHeartbeatOnce({ + cfg, + deps: { getQueueSize, nowMs: () => Date.now() } as HeartbeatDeps, + }); + + expect(replySpy).toHaveBeenCalled(); + expect(result.status).toBe("ran"); + }); + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 9615e7f9bfb..791102c3376 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -48,6 +48,7 @@ import { } from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { escapeRegExp } from "../utils.js"; +import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner.js"; import { formatErrorMessage, hasErrnoCode } from "./errors.js"; import { isWithinActiveHours } from "./heartbeat-active-hours.js"; import { @@ -634,6 +635,21 @@ export async function runHeartbeatOnce(opts: { return { status: "skipped", reason: preflight.skipReason }; } const { entry, sessionKey, storePath } = preflight.session; + + // Check the resolved session lane — if it is busy, skip to avoid interrupting + // an active streaming turn. The wake-layer retry (heartbeat-wake.ts) will + // re-schedule this wake automatically. See #14396 (closed without merge). + const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey); + const sessionLaneSize = (opts.deps?.getQueueSize ?? getQueueSize)(sessionLaneKey); + if (sessionLaneSize > 0) { + emitHeartbeatEvent({ + status: "skipped", + reason: "requests-in-flight", + durationMs: Date.now() - startedAt, + }); + return { status: "skipped", reason: "requests-in-flight" }; + } + const previousUpdatedAt = entry?.updatedAt; // When isolatedSession is enabled, create a fresh session via the same