fix(heartbeat): make phase scheduling active-hours-aware (#75487)

This commit is contained in:
Alex Knight
2026-05-01 20:32:42 +10:00
committed by clawsweeper
parent 0fad53a192
commit e66eb7042f
5 changed files with 432 additions and 2 deletions

View File

@@ -417,6 +417,7 @@ Docs: https://docs.openclaw.ai
- Gateway/sessions: use bounded tail reads for sessions-list transcript usage fallbacks and cap bulk title/last-message hydration, keeping large session stores responsive when rows request derived previews. Thanks @vincentkoc.
- Gateway/sessions: yield during bulk transcript title/preview hydration and copy compaction checkpoints asynchronously, keeping the Gateway event loop responsive for large session stores and large transcripts. Refs #75330 and #75414. Thanks @amknight.
- Gateway/sessions: stream bounded transcript reads for session detail, history, artifacts, compaction, and send/subscribe sequence paths so small Gateway requests no longer materialize large transcripts or OOM on oversized session logs. Thanks @vincentkoc.
- Heartbeat/scheduler: make heartbeat phase scheduling active-hours-aware so the scheduler seeks forward to the first in-window phase slot instead of arming timers for quiet-hours slots and relying solely on the runtime guard. Non-UTC `activeHours.timezone` values (e.g. `Asia/Shanghai`) now correctly influence when the next heartbeat timer fires, avoiding wasted quiet-hours ticks and long dormant gaps after gateway restarts. Fixes #75487. Thanks @amknight.
- Gateway/chat: bound chat-history transcript reads to the requested display window so large session logs no longer OOM the Gateway when clients ask for a small history page. Thanks @vincentkoc.
- BlueBubbles: detect audio attachments by Apple UTIs (`public.audio`, `public.mpeg-4-audio`, `com.apple.m4a-audio`, `com.apple.coreaudio-format`) in addition to `audio/*` MIME, so iMessage voice notes whose webhook payload only carries the UTI are now classified as audio in the inbound `<media:audio>` placeholder instead of falling through to the generic `<media:attachment>` tag. Thanks @omarshahine.
- Voice Call/Twilio: honor stored pre-connect TwiML before realtime webhook shortcuts and reject DTMF sequences outside conversation mode, so Meet PIN entry cannot be skipped or silently dropped. Thanks @donkeykong91 and @PfanP.

View File

@@ -0,0 +1,246 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { startHeartbeatRunner } from "./heartbeat-runner.js";
import { computeNextHeartbeatPhaseDueMs, resolveHeartbeatPhaseMs } from "./heartbeat-schedule.js";
import { resetHeartbeatWakeStateForTests } from "./heartbeat-wake.js";
/**
* E2E tests for heartbeat active-hours-aware scheduling (#75487).
*
* Verifies that the scheduler seeks forward through phase-aligned slots to
* find the first one that falls within the configured activeHours window,
* rather than arming a timer for a quiet-hours slot and relying solely on
* the runtime execution guard to skip it.
*/
describe("heartbeat scheduler: activeHours-aware scheduling (#75487)", () => {
type RunOnce = Parameters<typeof startHeartbeatRunner>[0]["runOnce"];
const TEST_SCHEDULER_SEED = "heartbeat-ah-schedule-test-seed";
function useFakeHeartbeatTime(startMs: number) {
vi.useFakeTimers();
vi.setSystemTime(new Date(startMs));
}
function heartbeatConfig(overrides?: {
every?: string;
activeHours?: { start: string; end: string; timezone?: string };
userTimezone?: string;
}): OpenClawConfig {
return {
agents: {
defaults: {
heartbeat: {
every: overrides?.every ?? "4h",
...(overrides?.activeHours ? { activeHours: overrides.activeHours } : {}),
},
...(overrides?.userTimezone ? { userTimezone: overrides.userTimezone } : {}),
},
},
} as OpenClawConfig;
}
function resolveDueFromNow(nowMs: number, intervalMs: number, agentId: string) {
return computeNextHeartbeatPhaseDueMs({
nowMs,
intervalMs,
phaseMs: resolveHeartbeatPhaseMs({
schedulerSeed: TEST_SCHEDULER_SEED,
agentId,
intervalMs,
}),
});
}
afterEach(() => {
resetHeartbeatWakeStateForTests();
vi.useRealTimers();
vi.restoreAllMocks();
});
it("skips quiet-hours slots and fires at the first in-window phase slot", async () => {
// Active window: 09:0017:00 UTC. Interval: 4h.
// Start the clock at 16:30 UTC — the next raw phase slot will be computed
// from this time. For a 4h interval the phase-aligned slots repeat every
// 4h. We resolve the first raw due, assert it falls outside the active
// window, then verify the runner arms its timer for the first in-window
// slot (which must be >= 09:00 the next day).
const startMs = Date.parse("2026-06-15T16:30:00.000Z");
useFakeHeartbeatTime(startMs);
const intervalMs = 4 * 60 * 60_000;
const callTimes: number[] = [];
const runSpy: RunOnce = vi.fn().mockImplementation(async () => {
callTimes.push(Date.now());
return { status: "ran", durationMs: 1 };
});
const runner = startHeartbeatRunner({
cfg: heartbeatConfig({
every: "4h",
activeHours: { start: "09:00", end: "17:00", timezone: "UTC" },
}),
runOnce: runSpy,
stableSchedulerSeed: TEST_SCHEDULER_SEED,
});
// Compute what the raw (timezone-unaware) first due would be.
const rawDueMs = resolveDueFromNow(startMs, intervalMs, "main");
// Advance past the raw due — the scheduler should NOT fire because that
// slot falls outside active hours (it's after 17:00).
await vi.advanceTimersByTimeAsync(rawDueMs - startMs + 1);
// If the scheduler is timezone-aware, it shouldn't have fired at the raw
// quiet-hours slot. It might have already found and armed the first
// in-window slot.
// Now advance to 09:01 on the next day plus enough time for any phase
// offset — the scheduler should fire within the active window.
const nextDay0900 = Date.parse("2026-06-16T09:00:00.000Z");
const safeEndOfWindow = Date.parse("2026-06-16T17:00:00.000Z");
await vi.advanceTimersByTimeAsync(safeEndOfWindow - Date.now());
// The first call must have happened within the active window.
expect(runSpy).toHaveBeenCalled();
const firstCallTime = callTimes[0]!;
const firstCallHourUTC = new Date(firstCallTime).getUTCHours();
expect(firstCallHourUTC).toBeGreaterThanOrEqual(9);
expect(firstCallHourUTC).toBeLessThan(17);
runner.stop();
});
it("fires immediately when the first phase slot is already within active hours", async () => {
// Active window: 08:0020:00 UTC. Interval: 4h.
// Start at 10:00 UTC — the first phase slot is within active hours.
const startMs = Date.parse("2026-06-15T10:00:00.000Z");
useFakeHeartbeatTime(startMs);
const intervalMs = 4 * 60 * 60_000;
const runSpy: RunOnce = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
const runner = startHeartbeatRunner({
cfg: heartbeatConfig({
every: "4h",
activeHours: { start: "08:00", end: "20:00", timezone: "UTC" },
}),
runOnce: runSpy,
stableSchedulerSeed: TEST_SCHEDULER_SEED,
});
const rawDueMs = resolveDueFromNow(startMs, intervalMs, "main");
// The raw due should be within active hours — advance to it.
await vi.advanceTimersByTimeAsync(rawDueMs - startMs + 1);
expect(runSpy).toHaveBeenCalledTimes(1);
runner.stop();
});
it("seeks forward correctly with a non-UTC timezone (e.g. America/New_York)", async () => {
// Active window: 09:0017:00 America/New_York (EDT = UTC-4 in June).
// So active hours in UTC = 13:0021:00.
// Interval: 4h. Start at 21:30 UTC (17:30 ET = outside window).
const startMs = Date.parse("2026-06-15T21:30:00.000Z");
useFakeHeartbeatTime(startMs);
const intervalMs = 4 * 60 * 60_000;
const callTimes: number[] = [];
const runSpy: RunOnce = vi.fn().mockImplementation(async () => {
callTimes.push(Date.now());
return { status: "ran", durationMs: 1 };
});
const runner = startHeartbeatRunner({
cfg: heartbeatConfig({
every: "4h",
activeHours: { start: "09:00", end: "17:00", timezone: "America/New_York" },
}),
runOnce: runSpy,
stableSchedulerSeed: TEST_SCHEDULER_SEED,
});
// Advance through two full days to capture the first fire.
await vi.advanceTimersByTimeAsync(48 * 60 * 60_000);
expect(runSpy).toHaveBeenCalled();
// Verify the first call was within the ET active window.
// 09:00 ET = 13:00 UTC, 17:00 ET = 21:00 UTC (during EDT, June).
const firstCallTime = callTimes[0]!;
const firstCallHourUTC = new Date(firstCallTime).getUTCHours();
// In ET active hours → UTC 13:0021:00
expect(firstCallHourUTC).toBeGreaterThanOrEqual(13);
expect(firstCallHourUTC).toBeLessThan(21);
runner.stop();
});
it("advances to in-window slot after a quiet-hours skip during interval runs", async () => {
// Active window: 09:0017:00 UTC. Interval: 4h.
// Start at 09:00 UTC — first fire is within window.
// After the first fire, the next raw slot may fall outside the window.
// Verify the scheduler seeks forward past quiet-hours slots.
const startMs = Date.parse("2026-06-15T09:00:00.000Z");
useFakeHeartbeatTime(startMs);
const intervalMs = 4 * 60 * 60_000;
const callTimes: number[] = [];
const runSpy: RunOnce = vi.fn().mockImplementation(async () => {
callTimes.push(Date.now());
return { status: "ran", durationMs: 1 };
});
const runner = startHeartbeatRunner({
cfg: heartbeatConfig({
every: "4h",
activeHours: { start: "09:00", end: "17:00", timezone: "UTC" },
}),
runOnce: runSpy,
stableSchedulerSeed: TEST_SCHEDULER_SEED,
});
// Advance through 48 hours — collect all fire times.
await vi.advanceTimersByTimeAsync(48 * 60 * 60_000);
// Every single fire must be within 09:0017:00 UTC.
expect(callTimes.length).toBeGreaterThan(0);
for (const t of callTimes) {
const hour = new Date(t).getUTCHours();
expect(
hour,
`fire at ${new Date(t).toISOString()} is outside active window`,
).toBeGreaterThanOrEqual(9);
expect(hour, `fire at ${new Date(t).toISOString()} is outside active window`).toBeLessThan(
17,
);
}
runner.stop();
});
it("does not loop indefinitely when activeHours window is zero-width", async () => {
// start === end means never-active. The scheduler should arm at the raw
// slot (fallback) and the runtime guard will skip execution.
const startMs = Date.parse("2026-06-15T10:00:00.000Z");
useFakeHeartbeatTime(startMs);
const runSpy: RunOnce = vi.fn().mockResolvedValue({ status: "skipped", reason: "quiet-hours" });
const runner = startHeartbeatRunner({
cfg: heartbeatConfig({
every: "30m",
activeHours: { start: "12:00", end: "12:00", timezone: "UTC" },
}),
runOnce: runSpy,
stableSchedulerSeed: TEST_SCHEDULER_SEED,
});
// Advance 2 hours — the scheduler should not hang or spin.
await vi.advanceTimersByTimeAsync(2 * 60 * 60_000);
// The runner fires because start === end returns false from
// isWithinActiveHours, so the seek falls back to the raw slot.
// runOnce returns quiet-hours skip each time.
expect(runSpy).toHaveBeenCalled();
runner.stop();
});
});

View File

@@ -97,6 +97,7 @@ import {
computeNextHeartbeatPhaseDueMs,
resolveHeartbeatPhaseMs,
resolveNextHeartbeatDueMs,
seekNextActivePhaseDueMs,
} from "./heartbeat-schedule.js";
import {
isHeartbeatEnabledForAgent,
@@ -1779,8 +1780,16 @@ export function startHeartbeatRunner(opts: {
: undefined,
});
const seekActiveSlotForAgent = (agent: HeartbeatAgentState, rawDueMs: number) =>
seekNextActivePhaseDueMs({
startMs: rawDueMs,
intervalMs: agent.intervalMs,
phaseMs: agent.phaseMs,
isActive: (ms) => isWithinActiveHours(state.cfg, agent.heartbeat, ms),
});
const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number, reason?: string) => {
agent.nextDueMs =
const rawDueMs =
reason === "interval"
? computeNextHeartbeatPhaseDueMs({
nowMs: now,
@@ -1790,6 +1799,7 @@ export function startHeartbeatRunner(opts: {
: // Targeted and action-driven wakes still count as a fresh heartbeat run
// for cooldown purposes, so keep the existing now + interval behavior.
now + agent.intervalMs;
agent.nextDueMs = seekActiveSlotForAgent(agent, rawDueMs);
};
// Centralized cooldown gate. Both targeted and broadcast dispatch branches
@@ -1894,7 +1904,13 @@ export function startHeartbeatRunner(opts: {
});
intervals.push(intervalMs);
const prevState = prevAgents.get(agent.agentId);
const nextDueMs = resolveNextDue(now, intervalMs, phaseMs, prevState);
const rawNextDueMs = resolveNextDue(now, intervalMs, phaseMs, prevState);
const nextDueMs = seekNextActivePhaseDueMs({
startMs: rawNextDueMs,
intervalMs,
phaseMs,
isActive: (ms) => isWithinActiveHours(cfg, agent.heartbeat, ms),
});
nextAgents.set(agent.agentId, {
agentId: agent.agentId,
heartbeat: agent.heartbeat,

View File

@@ -3,6 +3,7 @@ import {
computeNextHeartbeatPhaseDueMs,
resolveHeartbeatPhaseMs,
resolveNextHeartbeatDueMs,
seekNextActivePhaseDueMs,
} from "./heartbeat-schedule.js";
describe("heartbeat schedule helpers", () => {
@@ -61,3 +62,134 @@ describe("heartbeat schedule helpers", () => {
).toBe(nextDueMs);
});
});
describe("seekNextActivePhaseDueMs", () => {
const HOUR = 60 * 60_000;
it("returns startMs immediately when no isActive predicate is provided", () => {
const startMs = Date.parse("2026-01-01T03:00:00.000Z");
expect(
seekNextActivePhaseDueMs({
startMs,
intervalMs: 4 * HOUR,
phaseMs: 0,
}),
).toBe(startMs);
});
it("returns startMs when the first slot is already within active hours", () => {
const startMs = Date.parse("2026-01-01T10:00:00.000Z");
expect(
seekNextActivePhaseDueMs({
startMs,
intervalMs: 4 * HOUR,
phaseMs: 0,
isActive: () => true,
}),
).toBe(startMs);
});
it("skips quiet-hours slots and returns the first in-window slot", () => {
// Active window: 08:00 17:00 UTC
// Interval: 4h, start slot at 19:00 UTC (quiet hours)
// Next slots: 23:00 (quiet), 03:00 (quiet), 07:00 (quiet), 11:00 (active!)
const startMs = Date.parse("2026-01-01T19:00:00.000Z");
const intervalMs = 4 * HOUR;
const isActive = (ms: number) => {
const hour = new Date(ms).getUTCHours();
return hour >= 8 && hour < 17;
};
const result = seekNextActivePhaseDueMs({
startMs,
intervalMs,
phaseMs: 0,
isActive,
});
// 19:00 + 4h = 23:00 (skip) + 4h = 03:00 (skip) + 4h = 07:00 (skip) + 4h = 11:00
expect(result).toBe(Date.parse("2026-01-02T11:00:00.000Z"));
});
it("handles overnight active windows correctly", () => {
// Active window: 22:00 06:00 UTC (overnight)
// Interval: 4h, start slot at 10:00 UTC (quiet hours)
// Next: 14:00 (quiet), 18:00 (quiet), 22:00 (active!)
const startMs = Date.parse("2026-01-01T10:00:00.000Z");
const intervalMs = 4 * HOUR;
const isActive = (ms: number) => {
const hour = new Date(ms).getUTCHours();
return hour >= 22 || hour < 6;
};
const result = seekNextActivePhaseDueMs({
startMs,
intervalMs,
phaseMs: 0,
isActive,
});
expect(result).toBe(Date.parse("2026-01-01T22:00:00.000Z"));
});
it("falls back to startMs when no slot is active within the seek horizon", () => {
// All slots are outside active hours (isActive always returns false)
const startMs = Date.parse("2026-01-01T10:00:00.000Z");
const result = seekNextActivePhaseDueMs({
startMs,
intervalMs: 4 * HOUR,
phaseMs: 0,
isActive: () => false,
});
expect(result).toBe(startMs);
});
it("seeks across timezone-aware active hours using isWithinActiveHours semantics", () => {
// Simulate Asia/Shanghai: active 08:00-23:00 local = 00:00-15:00 UTC
// Interval: 4h, phase slot at 15:21 UTC (23:21 Shanghai = quiet)
// Next: 19:21 UTC (03:21 Shanghai = quiet), 23:21 UTC (07:21 = quiet),
// 03:21 UTC (11:21 Shanghai = active!)
const startMs = Date.parse("2026-01-01T15:21:00.000Z");
const intervalMs = 4 * HOUR;
const shanghaiOffsetMs = 8 * HOUR;
const isActive = (ms: number) => {
// Simulate Asia/Shanghai = UTC+8, active 08:00-23:00
const shanghaiMs = ms + shanghaiOffsetMs;
const shanghaiHour = new Date(shanghaiMs).getUTCHours();
return shanghaiHour >= 8 && shanghaiHour < 23;
};
const result = seekNextActivePhaseDueMs({
startMs,
intervalMs,
phaseMs: 0,
isActive,
});
// 15:21 + 4h = 19:21 (skip) + 4h = 23:21 (skip) + 4h = 03:21 UTC = 11:21 Shanghai
expect(result).toBe(Date.parse("2026-01-02T03:21:00.000Z"));
});
it("handles very short intervals efficiently", () => {
// 30-minute interval, active window 09:00-17:00
// Start at 17:00 (quiet), should find 09:00 next day
const startMs = Date.parse("2026-01-01T17:00:00.000Z");
const intervalMs = 30 * 60_000;
const isActive = (ms: number) => {
const hour = new Date(ms).getUTCHours();
return hour >= 9 && hour < 17;
};
const result = seekNextActivePhaseDueMs({
startMs,
intervalMs,
phaseMs: 0,
isActive,
});
// Should skip 32 half-hour slots (17:00 through 08:30) to reach 09:00 next day
expect(result).toBe(Date.parse("2026-01-02T09:00:00.000Z"));
});
});

View File

@@ -57,3 +57,38 @@ export function resolveNextHeartbeatDueMs(params: {
phaseMs,
});
}
/**
* Seek forward through phase-aligned slots until one falls within the active
* hours window. Returns the first in-window slot, or falls back to the raw
* next slot when no active hours are configured or no in-window slot is found
* within the seek horizon.
*
* `isActive` is a predicate that mirrors `isWithinActiveHours` — the caller
* binds the config/heartbeat so this module stays config-agnostic.
*/
const MAX_SEEK_HORIZON_MS = 7 * 24 * 60 * 60_000; // 7 days
export function seekNextActivePhaseDueMs(params: {
startMs: number;
intervalMs: number;
phaseMs: number;
isActive?: (ms: number) => boolean;
}): number {
const isActive = params.isActive;
if (!isActive) {
return params.startMs;
}
const intervalMs = Math.max(1, Math.floor(params.intervalMs));
const horizonMs = params.startMs + MAX_SEEK_HORIZON_MS;
let candidateMs = params.startMs;
while (candidateMs <= horizonMs) {
if (isActive(candidateMs)) {
return candidateMs;
}
candidateMs += intervalMs;
}
// All slots within the seek horizon fall outside active hours — return the
// raw first slot so the runtime execution guard can still gate it.
return params.startMs;
}