From e66eb7042f5f2b475f8b9d6300c7b4abadefe4af Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Fri, 1 May 2026 20:32:42 +1000 Subject: [PATCH] fix(heartbeat): make phase scheduling active-hours-aware (#75487) --- CHANGELOG.md | 1 + ...t-runner.active-hours-schedule.e2e.test.ts | 246 ++++++++++++++++++ src/infra/heartbeat-runner.ts | 20 +- src/infra/heartbeat-schedule.test.ts | 132 ++++++++++ src/infra/heartbeat-schedule.ts | 35 +++ 5 files changed, 432 insertions(+), 2 deletions(-) create mode 100644 src/infra/heartbeat-runner.active-hours-schedule.e2e.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ec4833d4e0b..7950b2f7c8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -417,6 +417,7 @@ Docs: https://docs.openclaw.ai - Gateway/sessions: use bounded tail reads for sessions-list transcript usage fallbacks and cap bulk title/last-message hydration, keeping large session stores responsive when rows request derived previews. Thanks @vincentkoc. - Gateway/sessions: yield during bulk transcript title/preview hydration and copy compaction checkpoints asynchronously, keeping the Gateway event loop responsive for large session stores and large transcripts. Refs #75330 and #75414. Thanks @amknight. - Gateway/sessions: stream bounded transcript reads for session detail, history, artifacts, compaction, and send/subscribe sequence paths so small Gateway requests no longer materialize large transcripts or OOM on oversized session logs. Thanks @vincentkoc. +- Heartbeat/scheduler: make heartbeat phase scheduling active-hours-aware so the scheduler seeks forward to the first in-window phase slot instead of arming timers for quiet-hours slots and relying solely on the runtime guard. Non-UTC `activeHours.timezone` values (e.g. `Asia/Shanghai`) now correctly influence when the next heartbeat timer fires, avoiding wasted quiet-hours ticks and long dormant gaps after gateway restarts. Fixes #75487. Thanks @amknight. - Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc. - BlueBubbles: detect audio attachments by Apple UTIs (`public.audio`, `public.mpeg-4-audio`, `com.apple.m4a-audio`, `com.apple.coreaudio-format`) in addition to `audio/*` MIME, so iMessage voice notes whose webhook payload only carries the UTI are now classified as audio in the inbound `` placeholder instead of falling through to the generic `` tag. Thanks @omarshahine. - Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP. diff --git a/src/infra/heartbeat-runner.active-hours-schedule.e2e.test.ts b/src/infra/heartbeat-runner.active-hours-schedule.e2e.test.ts new file mode 100644 index 00000000000..73591e09cdb --- /dev/null +++ b/src/infra/heartbeat-runner.active-hours-schedule.e2e.test.ts @@ -0,0 +1,246 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import { startHeartbeatRunner } from "./heartbeat-runner.js"; +import { computeNextHeartbeatPhaseDueMs, resolveHeartbeatPhaseMs } from "./heartbeat-schedule.js"; +import { resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js"; + +/** + * E2E tests for heartbeat active-hours-aware scheduling (#75487). + * + * Verifies that the scheduler seeks forward through phase-aligned slots to + * find the first one that falls within the configured activeHours window, + * rather than arming a timer for a quiet-hours slot and relying solely on + * the runtime execution guard to skip it. + */ +describe("heartbeat scheduler: activeHours-aware scheduling (#75487)", () => { + type RunOnce = Parameters[0]["runOnce"]; + const TEST_SCHEDULER_SEED = "heartbeat-ah-schedule-test-seed"; + + function useFakeHeartbeatTime(startMs: number) { + vi.useFakeTimers(); + vi.setSystemTime(new Date(startMs)); + } + + function heartbeatConfig(overrides?: { + every?: string; + activeHours?: { start: string; end: string; timezone?: string }; + userTimezone?: string; + }): OpenClawConfig { + return { + agents: { + defaults: { + heartbeat: { + every: overrides?.every ?? "4h", + ...(overrides?.activeHours ? { activeHours: overrides.activeHours } : {}), + }, + ...(overrides?.userTimezone ? { userTimezone: overrides.userTimezone } : {}), + }, + }, + } as OpenClawConfig; + } + + function resolveDueFromNow(nowMs: number, intervalMs: number, agentId: string) { + return computeNextHeartbeatPhaseDueMs({ + nowMs, + intervalMs, + phaseMs: resolveHeartbeatPhaseMs({ + schedulerSeed: TEST_SCHEDULER_SEED, + agentId, + intervalMs, + }), + }); + } + + afterEach(() => { + resetHeartbeatWakeStateForTests(); + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it("skips quiet-hours slots and fires at the first in-window phase slot", async () => { + // Active window: 09:00–17:00 UTC. Interval: 4h. + // Start the clock at 16:30 UTC — the next raw phase slot will be computed + // from this time. For a 4h interval the phase-aligned slots repeat every + // 4h. We resolve the first raw due, assert it falls outside the active + // window, then verify the runner arms its timer for the first in-window + // slot (which must be >= 09:00 the next day). + const startMs = Date.parse("2026-06-15T16:30:00.000Z"); + useFakeHeartbeatTime(startMs); + + const intervalMs = 4 * 60 * 60_000; + const callTimes: number[] = []; + const runSpy: RunOnce = vi.fn().mockImplementation(async () => { + callTimes.push(Date.now()); + return { status: "ran", durationMs: 1 }; + }); + + const runner = startHeartbeatRunner({ + cfg: heartbeatConfig({ + every: "4h", + activeHours: { start: "09:00", end: "17:00", timezone: "UTC" }, + }), + runOnce: runSpy, + stableSchedulerSeed: TEST_SCHEDULER_SEED, + }); + + // Compute what the raw (timezone-unaware) first due would be. + const rawDueMs = resolveDueFromNow(startMs, intervalMs, "main"); + + // Advance past the raw due — the scheduler should NOT fire because that + // slot falls outside active hours (it's after 17:00). + await vi.advanceTimersByTimeAsync(rawDueMs - startMs + 1); + // If the scheduler is timezone-aware, it shouldn't have fired at the raw + // quiet-hours slot. It might have already found and armed the first + // in-window slot. + + // Now advance to 09:01 on the next day plus enough time for any phase + // offset — the scheduler should fire within the active window. + const nextDay0900 = Date.parse("2026-06-16T09:00:00.000Z"); + const safeEndOfWindow = Date.parse("2026-06-16T17:00:00.000Z"); + await vi.advanceTimersByTimeAsync(safeEndOfWindow - Date.now()); + + // The first call must have happened within the active window. + expect(runSpy).toHaveBeenCalled(); + const firstCallTime = callTimes[0]!; + const firstCallHourUTC = new Date(firstCallTime).getUTCHours(); + expect(firstCallHourUTC).toBeGreaterThanOrEqual(9); + expect(firstCallHourUTC).toBeLessThan(17); + + runner.stop(); + }); + + it("fires immediately when the first phase slot is already within active hours", async () => { + // Active window: 08:00–20:00 UTC. Interval: 4h. + // Start at 10:00 UTC — the first phase slot is within active hours. + const startMs = Date.parse("2026-06-15T10:00:00.000Z"); + useFakeHeartbeatTime(startMs); + + const intervalMs = 4 * 60 * 60_000; + const runSpy: RunOnce = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + + const runner = startHeartbeatRunner({ + cfg: heartbeatConfig({ + every: "4h", + activeHours: { start: "08:00", end: "20:00", timezone: "UTC" }, + }), + runOnce: runSpy, + stableSchedulerSeed: TEST_SCHEDULER_SEED, + }); + + const rawDueMs = resolveDueFromNow(startMs, intervalMs, "main"); + + // The raw due should be within active hours — advance to it. + await vi.advanceTimersByTimeAsync(rawDueMs - startMs + 1); + + expect(runSpy).toHaveBeenCalledTimes(1); + runner.stop(); + }); + + it("seeks forward correctly with a non-UTC timezone (e.g. America/New_York)", async () => { + // Active window: 09:00–17:00 America/New_York (EDT = UTC-4 in June). + // So active hours in UTC = 13:00–21:00. + // Interval: 4h. Start at 21:30 UTC (17:30 ET = outside window). + const startMs = Date.parse("2026-06-15T21:30:00.000Z"); + useFakeHeartbeatTime(startMs); + + const intervalMs = 4 * 60 * 60_000; + const callTimes: number[] = []; + const runSpy: RunOnce = vi.fn().mockImplementation(async () => { + callTimes.push(Date.now()); + return { status: "ran", durationMs: 1 }; + }); + + const runner = startHeartbeatRunner({ + cfg: heartbeatConfig({ + every: "4h", + activeHours: { start: "09:00", end: "17:00", timezone: "America/New_York" }, + }), + runOnce: runSpy, + stableSchedulerSeed: TEST_SCHEDULER_SEED, + }); + + // Advance through two full days to capture the first fire. + await vi.advanceTimersByTimeAsync(48 * 60 * 60_000); + + expect(runSpy).toHaveBeenCalled(); + // Verify the first call was within the ET active window. + // 09:00 ET = 13:00 UTC, 17:00 ET = 21:00 UTC (during EDT, June). + const firstCallTime = callTimes[0]!; + const firstCallHourUTC = new Date(firstCallTime).getUTCHours(); + // In ET active hours → UTC 13:00–21:00 + expect(firstCallHourUTC).toBeGreaterThanOrEqual(13); + expect(firstCallHourUTC).toBeLessThan(21); + + runner.stop(); + }); + + it("advances to in-window slot after a quiet-hours skip during interval runs", async () => { + // Active window: 09:00–17:00 UTC. Interval: 4h. + // Start at 09:00 UTC — first fire is within window. + // After the first fire, the next raw slot may fall outside the window. + // Verify the scheduler seeks forward past quiet-hours slots. + const startMs = Date.parse("2026-06-15T09:00:00.000Z"); + useFakeHeartbeatTime(startMs); + + const intervalMs = 4 * 60 * 60_000; + const callTimes: number[] = []; + const runSpy: RunOnce = vi.fn().mockImplementation(async () => { + callTimes.push(Date.now()); + return { status: "ran", durationMs: 1 }; + }); + + const runner = startHeartbeatRunner({ + cfg: heartbeatConfig({ + every: "4h", + activeHours: { start: "09:00", end: "17:00", timezone: "UTC" }, + }), + runOnce: runSpy, + stableSchedulerSeed: TEST_SCHEDULER_SEED, + }); + + // Advance through 48 hours — collect all fire times. + await vi.advanceTimersByTimeAsync(48 * 60 * 60_000); + + // Every single fire must be within 09:00–17:00 UTC. + expect(callTimes.length).toBeGreaterThan(0); + for (const t of callTimes) { + const hour = new Date(t).getUTCHours(); + expect( + hour, + `fire at ${new Date(t).toISOString()} is outside active window`, + ).toBeGreaterThanOrEqual(9); + expect(hour, `fire at ${new Date(t).toISOString()} is outside active window`).toBeLessThan( + 17, + ); + } + + runner.stop(); + }); + + it("does not loop indefinitely when activeHours window is zero-width", async () => { + // start === end means never-active. The scheduler should arm at the raw + // slot (fallback) and the runtime guard will skip execution. + const startMs = Date.parse("2026-06-15T10:00:00.000Z"); + useFakeHeartbeatTime(startMs); + + const runSpy: RunOnce = vi.fn().mockResolvedValue({ status: "skipped", reason: "quiet-hours" }); + + const runner = startHeartbeatRunner({ + cfg: heartbeatConfig({ + every: "30m", + activeHours: { start: "12:00", end: "12:00", timezone: "UTC" }, + }), + runOnce: runSpy, + stableSchedulerSeed: TEST_SCHEDULER_SEED, + }); + + // Advance 2 hours — the scheduler should not hang or spin. + await vi.advanceTimersByTimeAsync(2 * 60 * 60_000); + + // The runner fires because start === end returns false from + // isWithinActiveHours, so the seek falls back to the raw slot. + // runOnce returns quiet-hours skip each time. + expect(runSpy).toHaveBeenCalled(); + runner.stop(); + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index b23d704f194..d635c783755 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -97,6 +97,7 @@ import { computeNextHeartbeatPhaseDueMs, resolveHeartbeatPhaseMs, resolveNextHeartbeatDueMs, + seekNextActivePhaseDueMs, } from "./heartbeat-schedule.js"; import { isHeartbeatEnabledForAgent, @@ -1779,8 +1780,16 @@ export function startHeartbeatRunner(opts: { : undefined, }); + const seekActiveSlotForAgent = (agent: HeartbeatAgentState, rawDueMs: number) => + seekNextActivePhaseDueMs({ + startMs: rawDueMs, + intervalMs: agent.intervalMs, + phaseMs: agent.phaseMs, + isActive: (ms) => isWithinActiveHours(state.cfg, agent.heartbeat, ms), + }); + const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number, reason?: string) => { - agent.nextDueMs = + const rawDueMs = reason === "interval" ? computeNextHeartbeatPhaseDueMs({ nowMs: now, @@ -1790,6 +1799,7 @@ export function startHeartbeatRunner(opts: { : // Targeted and action-driven wakes still count as a fresh heartbeat run // for cooldown purposes, so keep the existing now + interval behavior. now + agent.intervalMs; + agent.nextDueMs = seekActiveSlotForAgent(agent, rawDueMs); }; // Centralized cooldown gate. Both targeted and broadcast dispatch branches @@ -1894,7 +1904,13 @@ export function startHeartbeatRunner(opts: { }); intervals.push(intervalMs); const prevState = prevAgents.get(agent.agentId); - const nextDueMs = resolveNextDue(now, intervalMs, phaseMs, prevState); + const rawNextDueMs = resolveNextDue(now, intervalMs, phaseMs, prevState); + const nextDueMs = seekNextActivePhaseDueMs({ + startMs: rawNextDueMs, + intervalMs, + phaseMs, + isActive: (ms) => isWithinActiveHours(cfg, agent.heartbeat, ms), + }); nextAgents.set(agent.agentId, { agentId: agent.agentId, heartbeat: agent.heartbeat, diff --git a/src/infra/heartbeat-schedule.test.ts b/src/infra/heartbeat-schedule.test.ts index c4d848f0fdf..1fc085785e5 100644 --- a/src/infra/heartbeat-schedule.test.ts +++ b/src/infra/heartbeat-schedule.test.ts @@ -3,6 +3,7 @@ import { computeNextHeartbeatPhaseDueMs, resolveHeartbeatPhaseMs, resolveNextHeartbeatDueMs, + seekNextActivePhaseDueMs, } from "./heartbeat-schedule.js"; describe("heartbeat schedule helpers", () => { @@ -61,3 +62,134 @@ describe("heartbeat schedule helpers", () => { ).toBe(nextDueMs); }); }); + +describe("seekNextActivePhaseDueMs", () => { + const HOUR = 60 * 60_000; + + it("returns startMs immediately when no isActive predicate is provided", () => { + const startMs = Date.parse("2026-01-01T03:00:00.000Z"); + expect( + seekNextActivePhaseDueMs({ + startMs, + intervalMs: 4 * HOUR, + phaseMs: 0, + }), + ).toBe(startMs); + }); + + it("returns startMs when the first slot is already within active hours", () => { + const startMs = Date.parse("2026-01-01T10:00:00.000Z"); + expect( + seekNextActivePhaseDueMs({ + startMs, + intervalMs: 4 * HOUR, + phaseMs: 0, + isActive: () => true, + }), + ).toBe(startMs); + }); + + it("skips quiet-hours slots and returns the first in-window slot", () => { + // Active window: 08:00 – 17:00 UTC + // Interval: 4h, start slot at 19:00 UTC (quiet hours) + // Next slots: 23:00 (quiet), 03:00 (quiet), 07:00 (quiet), 11:00 (active!) + const startMs = Date.parse("2026-01-01T19:00:00.000Z"); + const intervalMs = 4 * HOUR; + const isActive = (ms: number) => { + const hour = new Date(ms).getUTCHours(); + return hour >= 8 && hour < 17; + }; + + const result = seekNextActivePhaseDueMs({ + startMs, + intervalMs, + phaseMs: 0, + isActive, + }); + + // 19:00 + 4h = 23:00 (skip) + 4h = 03:00 (skip) + 4h = 07:00 (skip) + 4h = 11:00 + expect(result).toBe(Date.parse("2026-01-02T11:00:00.000Z")); + }); + + it("handles overnight active windows correctly", () => { + // Active window: 22:00 – 06:00 UTC (overnight) + // Interval: 4h, start slot at 10:00 UTC (quiet hours) + // Next: 14:00 (quiet), 18:00 (quiet), 22:00 (active!) + const startMs = Date.parse("2026-01-01T10:00:00.000Z"); + const intervalMs = 4 * HOUR; + const isActive = (ms: number) => { + const hour = new Date(ms).getUTCHours(); + return hour >= 22 || hour < 6; + }; + + const result = seekNextActivePhaseDueMs({ + startMs, + intervalMs, + phaseMs: 0, + isActive, + }); + + expect(result).toBe(Date.parse("2026-01-01T22:00:00.000Z")); + }); + + it("falls back to startMs when no slot is active within the seek horizon", () => { + // All slots are outside active hours (isActive always returns false) + const startMs = Date.parse("2026-01-01T10:00:00.000Z"); + const result = seekNextActivePhaseDueMs({ + startMs, + intervalMs: 4 * HOUR, + phaseMs: 0, + isActive: () => false, + }); + + expect(result).toBe(startMs); + }); + + it("seeks across timezone-aware active hours using isWithinActiveHours semantics", () => { + // Simulate Asia/Shanghai: active 08:00-23:00 local = 00:00-15:00 UTC + // Interval: 4h, phase slot at 15:21 UTC (23:21 Shanghai = quiet) + // Next: 19:21 UTC (03:21 Shanghai = quiet), 23:21 UTC (07:21 = quiet), + // 03:21 UTC (11:21 Shanghai = active!) + const startMs = Date.parse("2026-01-01T15:21:00.000Z"); + const intervalMs = 4 * HOUR; + const shanghaiOffsetMs = 8 * HOUR; + + const isActive = (ms: number) => { + // Simulate Asia/Shanghai = UTC+8, active 08:00-23:00 + const shanghaiMs = ms + shanghaiOffsetMs; + const shanghaiHour = new Date(shanghaiMs).getUTCHours(); + return shanghaiHour >= 8 && shanghaiHour < 23; + }; + + const result = seekNextActivePhaseDueMs({ + startMs, + intervalMs, + phaseMs: 0, + isActive, + }); + + // 15:21 + 4h = 19:21 (skip) + 4h = 23:21 (skip) + 4h = 03:21 UTC = 11:21 Shanghai + expect(result).toBe(Date.parse("2026-01-02T03:21:00.000Z")); + }); + + it("handles very short intervals efficiently", () => { + // 30-minute interval, active window 09:00-17:00 + // Start at 17:00 (quiet), should find 09:00 next day + const startMs = Date.parse("2026-01-01T17:00:00.000Z"); + const intervalMs = 30 * 60_000; + const isActive = (ms: number) => { + const hour = new Date(ms).getUTCHours(); + return hour >= 9 && hour < 17; + }; + + const result = seekNextActivePhaseDueMs({ + startMs, + intervalMs, + phaseMs: 0, + isActive, + }); + + // Should skip 32 half-hour slots (17:00 through 08:30) to reach 09:00 next day + expect(result).toBe(Date.parse("2026-01-02T09:00:00.000Z")); + }); +}); diff --git a/src/infra/heartbeat-schedule.ts b/src/infra/heartbeat-schedule.ts index 65b2fc4cd0e..5b432476a1b 100644 --- a/src/infra/heartbeat-schedule.ts +++ b/src/infra/heartbeat-schedule.ts @@ -57,3 +57,38 @@ export function resolveNextHeartbeatDueMs(params: { phaseMs, }); } + +/** + * Seek forward through phase-aligned slots until one falls within the active + * hours window. Returns the first in-window slot, or falls back to the raw + * next slot when no active hours are configured or no in-window slot is found + * within the seek horizon. + * + * `isActive` is a predicate that mirrors `isWithinActiveHours` — the caller + * binds the config/heartbeat so this module stays config-agnostic. + */ +const MAX_SEEK_HORIZON_MS = 7 * 24 * 60 * 60_000; // 7 days + +export function seekNextActivePhaseDueMs(params: { + startMs: number; + intervalMs: number; + phaseMs: number; + isActive?: (ms: number) => boolean; +}): number { + const isActive = params.isActive; + if (!isActive) { + return params.startMs; + } + const intervalMs = Math.max(1, Math.floor(params.intervalMs)); + const horizonMs = params.startMs + MAX_SEEK_HORIZON_MS; + let candidateMs = params.startMs; + while (candidateMs <= horizonMs) { + if (isActive(candidateMs)) { + return candidateMs; + } + candidateMs += intervalMs; + } + // All slots within the seek horizon fall outside active hours — return the + // raw first slot so the runtime execution guard can still gate it. + return params.startMs; +}