mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-07 07:11:06 +00:00
feat(heartbeat): add task batching support via HEARTBEAT.md
- Add parseHeartbeatTasks() to parse YAML-like task definitions - Add isTaskDue() to check if task interval has elapsed - Add heartbeatTaskState to session store for tracking last run times - Modify resolveHeartbeatRunPrompt to build batched prompts for due tasks - Update task last run times after successful heartbeat execution Implements openclaw#29570
This commit is contained in:
committed by
Peter Steinberger
parent
890de57036
commit
103bebd651
@@ -1,6 +1,13 @@
|
||||
import { parseDurationMs } from "../cli/parse-duration.js";
|
||||
import { escapeRegExp } from "../utils.js";
|
||||
import { HEARTBEAT_TOKEN } from "./tokens.js";
|
||||
|
||||
export type HeartbeatTask = {
|
||||
name: string;
|
||||
interval: string;
|
||||
prompt: string;
|
||||
};
|
||||
|
||||
// Default heartbeat prompt (used when config.agents.defaults.heartbeat.prompt is unset).
|
||||
// Keep it tight and avoid encouraging the model to invent/rehash "open loops" from prior chat context.
|
||||
export const HEARTBEAT_PROMPT =
|
||||
@@ -169,3 +176,132 @@ export function stripHeartbeatToken(
|
||||
|
||||
return { shouldSkip: false, text: rest, didStrip: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse heartbeat tasks from HEARTBEAT.md content.
|
||||
* Supports YAML-like task definitions:
|
||||
*
|
||||
* tasks:
|
||||
* - name: email-check
|
||||
* interval: 30m
|
||||
* prompt: "Check for urgent unread emails"
|
||||
*/
|
||||
export function parseHeartbeatTasks(content: string): HeartbeatTask[] {
|
||||
const tasks: HeartbeatTask[] = [];
|
||||
const lines = content.split("\n");
|
||||
let inTasksBlock = false;
|
||||
|
||||
for (let i = 0; i < lines.length; i++) {
|
||||
const line = lines[i];
|
||||
const trimmed = line.trim();
|
||||
|
||||
// Detect tasks block start
|
||||
if (trimmed === "tasks:") {
|
||||
inTasksBlock = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!inTasksBlock) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// End of tasks block (either empty line or new top-level content)
|
||||
if (
|
||||
!trimmed.startsWith(" ") &&
|
||||
!trimmed.startsWith("\t") &&
|
||||
trimmed &&
|
||||
!trimmed.startsWith("-")
|
||||
) {
|
||||
inTasksBlock = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse task entry
|
||||
if (trimmed.startsWith("- name:")) {
|
||||
const name = trimmed
|
||||
.replace("- name:", "")
|
||||
.trim()
|
||||
.replace(/^["']|["']$/g, "");
|
||||
let interval = "";
|
||||
let prompt = "";
|
||||
|
||||
// Look ahead for interval and prompt
|
||||
for (let j = i + 1; j < lines.length; j++) {
|
||||
const nextLine = lines[j];
|
||||
const nextTrimmed = nextLine.trim();
|
||||
|
||||
// End of this task
|
||||
if (nextTrimmed.startsWith("- name:")) {
|
||||
break;
|
||||
}
|
||||
// End of tasks block
|
||||
if (!nextTrimmed.startsWith(" ") && !nextTrimmed.startsWith("\t") && nextTrimmed) {
|
||||
inTasksBlock = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (nextTrimmed.startsWith("interval:")) {
|
||||
interval = nextTrimmed
|
||||
.replace("interval:", "")
|
||||
.trim()
|
||||
.replace(/^["']|["']$/g, "");
|
||||
} else if (nextTrimmed.startsWith("prompt:")) {
|
||||
prompt = nextTrimmed
|
||||
.replace("prompt:", "")
|
||||
.trim()
|
||||
.replace(/^["']|["']$/g, "");
|
||||
}
|
||||
}
|
||||
|
||||
if (name && interval && prompt) {
|
||||
tasks.push({ name, interval, prompt });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return tasks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a task is due based on its interval and last run time.
|
||||
*/
|
||||
export function isTaskDue(lastRunMs: number | undefined, interval: string, nowMs: number): boolean {
|
||||
if (lastRunMs === undefined) {
|
||||
return true; // Never run, always due
|
||||
}
|
||||
|
||||
try {
|
||||
const intervalMs = parseDurationMs(interval, { defaultUnit: "m" });
|
||||
return nowMs - lastRunMs >= intervalMs;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or initialize last run time for a task from session store.
|
||||
*/
|
||||
export function getTaskLastRunMs(
|
||||
taskName: string,
|
||||
sessionEntry: Record<string, unknown> | undefined,
|
||||
): number | undefined {
|
||||
if (!sessionEntry?.heartbeatTaskState) {
|
||||
return undefined;
|
||||
}
|
||||
const taskState = sessionEntry.heartbeatTaskState as Record<string, number>;
|
||||
return taskState[taskName];
|
||||
}
|
||||
|
||||
/**
|
||||
* Update last run time for a task in session store.
|
||||
*/
|
||||
export function updateTaskLastRunMs(
|
||||
taskName: string,
|
||||
nowMs: number,
|
||||
sessionEntry: Record<string, unknown>,
|
||||
): void {
|
||||
if (!sessionEntry.heartbeatTaskState) {
|
||||
sessionEntry.heartbeatTaskState = {};
|
||||
}
|
||||
(sessionEntry.heartbeatTaskState as Record<string, number>)[taskName] = nowMs;
|
||||
}
|
||||
|
||||
@@ -16,8 +16,11 @@ import { resolveHeartbeatReplyPayload } from "../auto-reply/heartbeat-reply-payl
|
||||
import {
|
||||
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
||||
isHeartbeatContentEffectivelyEmpty,
|
||||
isTaskDue,
|
||||
parseHeartbeatTasks,
|
||||
resolveHeartbeatPrompt as resolveHeartbeatPromptText,
|
||||
stripHeartbeatToken,
|
||||
type HeartbeatTask,
|
||||
} from "../auto-reply/heartbeat.js";
|
||||
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
@@ -407,6 +410,8 @@ type HeartbeatPreflight = HeartbeatReasonFlags & {
|
||||
hasTaggedCronEvents: boolean;
|
||||
shouldInspectPendingEvents: boolean;
|
||||
skipReason?: HeartbeatSkipReason;
|
||||
tasks?: HeartbeatTask[];
|
||||
heartbeatFileContent?: string;
|
||||
};
|
||||
|
||||
function resolveHeartbeatReasonFlags(reason?: string): HeartbeatReasonFlags {
|
||||
@@ -459,14 +464,24 @@ async function resolveHeartbeatPreflight(params: {
|
||||
|
||||
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
|
||||
const heartbeatFilePath = path.join(workspaceDir, DEFAULT_HEARTBEAT_FILENAME);
|
||||
let heartbeatFileContent: string | undefined;
|
||||
try {
|
||||
const heartbeatFileContent = await fs.readFile(heartbeatFilePath, "utf-8");
|
||||
if (isHeartbeatContentEffectivelyEmpty(heartbeatFileContent)) {
|
||||
heartbeatFileContent = await fs.readFile(heartbeatFilePath, "utf-8");
|
||||
const tasks = parseHeartbeatTasks(heartbeatFileContent);
|
||||
if (isHeartbeatContentEffectivelyEmpty(heartbeatFileContent) && tasks.length === 0) {
|
||||
return {
|
||||
...basePreflight,
|
||||
skipReason: "empty-heartbeat-file",
|
||||
tasks: [],
|
||||
heartbeatFileContent,
|
||||
};
|
||||
}
|
||||
// Return tasks even if file has other content - backward compatible
|
||||
return {
|
||||
...basePreflight,
|
||||
tasks,
|
||||
heartbeatFileContent,
|
||||
};
|
||||
} catch (err: unknown) {
|
||||
if (hasErrnoCode(err, "ENOENT")) {
|
||||
// Missing HEARTBEAT.md is intentional in some setups (for example, when
|
||||
@@ -518,6 +533,34 @@ function resolveHeartbeatRunPrompt(params: {
|
||||
.map((event) => event.text);
|
||||
const hasExecCompletion = pendingEvents.some(isExecCompletionEvent);
|
||||
const hasCronEvents = cronEvents.length > 0;
|
||||
|
||||
// If tasks are defined, build a batched prompt with due tasks
|
||||
if (params.preflight.tasks && params.preflight.tasks.length > 0) {
|
||||
const tasks = params.preflight.tasks;
|
||||
const nowMs = Date.now();
|
||||
const dueTasks = tasks.filter((task) =>
|
||||
isTaskDue(
|
||||
(params.preflight.session.entry?.heartbeatTaskState as Record<string, number>)?.[task.name],
|
||||
task.interval,
|
||||
nowMs,
|
||||
),
|
||||
);
|
||||
|
||||
if (dueTasks.length > 0) {
|
||||
const taskList = dueTasks.map((task) => `- ${task.name}: ${task.prompt}`).join("\n");
|
||||
const prompt = `Run the following periodic tasks (only those due based on their intervals):
|
||||
|
||||
${taskList}
|
||||
|
||||
After completing all due tasks, reply HEARTBEAT_OK.`;
|
||||
return { prompt, hasExecCompletion: false, hasCronEvents: false };
|
||||
}
|
||||
// No tasks due - still run but with empty task list
|
||||
const prompt = `No periodic tasks are due right now. Reply HEARTBEAT_OK.`;
|
||||
return { prompt, hasExecCompletion: false, hasCronEvents: false };
|
||||
}
|
||||
|
||||
// Fallback to original behavior
|
||||
const basePrompt = hasExecCompletion
|
||||
? buildExecEventPrompt({ deliverToUser: params.canRelayToUser })
|
||||
: hasCronEvents
|
||||
@@ -654,6 +697,27 @@ export async function runHeartbeatOnce(opts: {
|
||||
canRelayToUser,
|
||||
workspaceDir,
|
||||
});
|
||||
|
||||
// Update task last run times BEFORE model runs - ensures timestamps are persisted
|
||||
// even when model completes with HEARTBEAT_OK (which triggers early return)
|
||||
if (preflight.tasks && preflight.tasks.length > 0) {
|
||||
const store = loadSessionStore(storePath);
|
||||
const current = store[sessionKey];
|
||||
if (current) {
|
||||
const taskState = (current.heartbeatTaskState as Record<string, number>) || {};
|
||||
for (const task of preflight.tasks) {
|
||||
if (isTaskDue(taskState[task.name], task.interval, startedAt)) {
|
||||
taskState[task.name] = startedAt;
|
||||
}
|
||||
}
|
||||
store[sessionKey] = {
|
||||
...current,
|
||||
heartbeatTaskState: taskState,
|
||||
};
|
||||
await saveSessionStore(storePath, store);
|
||||
}
|
||||
}
|
||||
|
||||
const ctx = {
|
||||
Body: appendCronStyleCurrentTimeLine(prompt, cfg, startedAt),
|
||||
From: sender,
|
||||
|
||||
Reference in New Issue
Block a user