diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index afa36b35c76..ffb19b2a153 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -2477,6 +2477,7 @@ export async function runEmbeddedAttempt( } return Promise.allSettled(promises).then(() => undefined); }; + let heartbeatResponseTerminated = false; abortSessionForYield = () => { yieldAbortSettled = abortActiveSession(); }; @@ -3279,6 +3280,16 @@ export async function runEmbeddedAttempt( onAssistantMessageStart: params.onAssistantMessageStart, onExecutionPhase: params.onExecutionPhase, onAgentEvent: params.onAgentEvent, + onHeartbeatToolResponse: + params.trigger === "heartbeat" + ? () => { + if (heartbeatResponseTerminated) { + return; + } + heartbeatResponseTerminated = true; + void abortActiveSession(); + } + : undefined, terminalLifecyclePhase: params.deferTerminalLifecycleEnd ? "finishing" : "end", onBeforeLifecycleTerminal: () => { // Clear embedded-run activity before emitting terminal lifecycle events so @@ -4242,6 +4253,9 @@ export async function runEmbeddedAttempt( await persistSessionsYieldContextMessage(activeSession, yieldMessage); } }); + } else if (heartbeatResponseTerminated && isRunnerAbortError(err)) { + aborted = false; + await sessionLockController.waitForSessionEvents(activeSession); } else if (isMidTurnPrecheckSignal(err)) { await sessionLockController.waitForSessionEvents(activeSession); await sessionLockController.withSessionWriteLock(() => { diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index e5fd46062ae..6a0e38ba90b 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -1143,7 +1143,11 @@ export async function handleToolExecutionEnd( if (!isToolError && toolName === HEARTBEAT_RESPONSE_TOOL_NAME) { const response = normalizeHeartbeatToolResponse(result?.details); if (response) { + const isFirstHeartbeatResponse = ctx.state.heartbeatToolResponse === undefined; ctx.state.heartbeatToolResponse = response; + if (isFirstHeartbeatResponse) { + void ctx.params.onHeartbeatToolResponse?.(response); + } } } diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index c072934456f..44a69412b9c 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -198,6 +198,7 @@ type ToolHandlerParams = Pick< | "onBlockReplyFlush" | "onAgentEvent" | "onExecutionPhase" + | "onHeartbeatToolResponse" | "onToolResult" | "sessionKey" | "sessionId" diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 32967cd8c36..43ebfb67ca1 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -1,5 +1,6 @@ import type { AssistantMessage } from "@earendil-works/pi-ai"; import { describe, expect, it, vi } from "vitest"; +import { HEARTBEAT_RESPONSE_TOOL_NAME } from "../auto-reply/heartbeat-tool-response.js"; import * as agentEvents from "../infra/agent-events.js"; import { THINKING_TAG_CASES, @@ -1218,4 +1219,61 @@ describe("subscribeEmbeddedPiSession", () => { replayInvalid: true, }); }); + + it("notifies the runner once when a heartbeat response tool result is recorded", async () => { + const { session, emit } = createStubSessionHarness(); + const onHeartbeatToolResponse = vi.fn(); + const subscription = subscribeEmbeddedPiSession({ + session, + runId: "run-heartbeat-terminal", + sessionKey: "agent:main:main", + onHeartbeatToolResponse, + }); + + const result = { + details: { + status: "recorded", + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }, + }; + emitToolRun({ + emit, + toolName: HEARTBEAT_RESPONSE_TOOL_NAME, + toolCallId: "heartbeat-1", + args: { + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }, + isError: false, + result, + }); + emitToolRun({ + emit, + toolName: HEARTBEAT_RESPONSE_TOOL_NAME, + toolCallId: "heartbeat-2", + args: { + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }, + isError: false, + result, + }); + await flushBlockReplyCallbacks(); + + expect(subscription.getHeartbeatToolResponse()).toEqual({ + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }); + expect(onHeartbeatToolResponse).toHaveBeenCalledTimes(1); + expect(onHeartbeatToolResponse).toHaveBeenCalledWith({ + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }); + }); }); diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index ebdd1c4f58e..c0392f79542 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -3,6 +3,7 @@ import type { PartialReplyPayload, SourceReplyDeliveryMode, } from "../auto-reply/get-reply-options.types.js"; +import type { HeartbeatToolResponse } from "../auto-reply/heartbeat-tool-response.js"; import type { ReplyPayload } from "../auto-reply/reply-payload.js"; import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../auto-reply/thinking.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; @@ -56,6 +57,7 @@ export type SubscribeEmbeddedPiSessionParams = { data: Record; sessionKey?: string; }) => void | Promise; + onHeartbeatToolResponse?: (response: HeartbeatToolResponse) => void | Promise; terminalLifecyclePhase?: "end" | "finishing"; /** Best-effort hook invoked immediately before the terminal lifecycle event is emitted. */ onBeforeLifecycleTerminal?: () => void | Promise; diff --git a/src/agents/tools/heartbeat-response-tool.test.ts b/src/agents/tools/heartbeat-response-tool.test.ts index be660992ff3..eb244d91030 100644 --- a/src/agents/tools/heartbeat-response-tool.test.ts +++ b/src/agents/tools/heartbeat-response-tool.test.ts @@ -53,6 +53,24 @@ describe("createHeartbeatResponseTool", () => { expect(details.summary).toBe("Nothing needs attention."); }); + it("rejects repeated heartbeat responses from the same tool instance", async () => { + const tool = createHeartbeatResponseTool(); + + await tool.execute("call-1", { + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }); + + await expect( + tool.execute("call-2", { + outcome: "no_change", + notify: false, + summary: "Nothing needs attention.", + }), + ).rejects.toThrow("heartbeat_respond already recorded"); + }); + it("accepts notification text and optional scheduling metadata", async () => { const tool = createHeartbeatResponseTool(); diff --git a/src/agents/tools/heartbeat-response-tool.ts b/src/agents/tools/heartbeat-response-tool.ts index 7c0039abd57..01afbe373c2 100644 --- a/src/agents/tools/heartbeat-response-tool.ts +++ b/src/agents/tools/heartbeat-response-tool.ts @@ -36,6 +36,7 @@ function readRequiredBoolean(params: Record, key: string): bool } export function createHeartbeatResponseTool(): AnyAgentTool { + let recorded = false; return { label: "Heartbeat", name: HEARTBEAT_RESPONSE_TOOL_NAME, @@ -54,6 +55,10 @@ export function createHeartbeatResponseTool(): AnyAgentTool { "Invalid heartbeat response. Provide outcome, notify, and non-empty summary.", ); } + if (recorded) { + throw new ToolInputError("heartbeat_respond already recorded for this turn"); + } + recorded = true; return jsonResult({ status: "recorded", ...response,