Fix heartbeat response loop guard (#86324) (#86357)

This commit is contained in:
Zennn
2026-05-25 06:00:29 -04:00
committed by GitHub
parent 8b42771aab
commit e2c174e8c8
7 changed files with 102 additions and 0 deletions

View File

@@ -2477,6 +2477,7 @@ export async function runEmbeddedAttempt(
}
return Promise.allSettled(promises).then(() => undefined);
};
let heartbeatResponseTerminated = false;
abortSessionForYield = () => {
yieldAbortSettled = abortActiveSession();
};
@@ -3279,6 +3280,16 @@ export async function runEmbeddedAttempt(
onAssistantMessageStart: params.onAssistantMessageStart,
onExecutionPhase: params.onExecutionPhase,
onAgentEvent: params.onAgentEvent,
onHeartbeatToolResponse:
params.trigger === "heartbeat"
? () => {
if (heartbeatResponseTerminated) {
return;
}
heartbeatResponseTerminated = true;
void abortActiveSession();
}
: undefined,
terminalLifecyclePhase: params.deferTerminalLifecycleEnd ? "finishing" : "end",
onBeforeLifecycleTerminal: () => {
// Clear embedded-run activity before emitting terminal lifecycle events so
@@ -4242,6 +4253,9 @@ export async function runEmbeddedAttempt(
await persistSessionsYieldContextMessage(activeSession, yieldMessage);
}
});
} else if (heartbeatResponseTerminated && isRunnerAbortError(err)) {
aborted = false;
await sessionLockController.waitForSessionEvents(activeSession);
} else if (isMidTurnPrecheckSignal(err)) {
await sessionLockController.waitForSessionEvents(activeSession);
await sessionLockController.withSessionWriteLock(() => {

View File

@@ -1143,7 +1143,11 @@ export async function handleToolExecutionEnd(
if (!isToolError && toolName === HEARTBEAT_RESPONSE_TOOL_NAME) {
const response = normalizeHeartbeatToolResponse(result?.details);
if (response) {
const isFirstHeartbeatResponse = ctx.state.heartbeatToolResponse === undefined;
ctx.state.heartbeatToolResponse = response;
if (isFirstHeartbeatResponse) {
void ctx.params.onHeartbeatToolResponse?.(response);
}
}
}

View File

@@ -198,6 +198,7 @@ type ToolHandlerParams = Pick<
| "onBlockReplyFlush"
| "onAgentEvent"
| "onExecutionPhase"
| "onHeartbeatToolResponse"
| "onToolResult"
| "sessionKey"
| "sessionId"

View File

@@ -1,5 +1,6 @@
import type { AssistantMessage } from "@earendil-works/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { HEARTBEAT_RESPONSE_TOOL_NAME } from "../auto-reply/heartbeat-tool-response.js";
import * as agentEvents from "../infra/agent-events.js";
import {
THINKING_TAG_CASES,
@@ -1218,4 +1219,61 @@ describe("subscribeEmbeddedPiSession", () => {
replayInvalid: true,
});
});
it("notifies the runner once when a heartbeat response tool result is recorded", async () => {
const { session, emit } = createStubSessionHarness();
const onHeartbeatToolResponse = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session,
runId: "run-heartbeat-terminal",
sessionKey: "agent:main:main",
onHeartbeatToolResponse,
});
const result = {
details: {
status: "recorded",
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
},
};
emitToolRun({
emit,
toolName: HEARTBEAT_RESPONSE_TOOL_NAME,
toolCallId: "heartbeat-1",
args: {
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
},
isError: false,
result,
});
emitToolRun({
emit,
toolName: HEARTBEAT_RESPONSE_TOOL_NAME,
toolCallId: "heartbeat-2",
args: {
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
},
isError: false,
result,
});
await flushBlockReplyCallbacks();
expect(subscription.getHeartbeatToolResponse()).toEqual({
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
});
expect(onHeartbeatToolResponse).toHaveBeenCalledTimes(1);
expect(onHeartbeatToolResponse).toHaveBeenCalledWith({
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
});
});
});

View File

@@ -3,6 +3,7 @@ import type {
PartialReplyPayload,
SourceReplyDeliveryMode,
} from "../auto-reply/get-reply-options.types.js";
import type { HeartbeatToolResponse } from "../auto-reply/heartbeat-tool-response.js";
import type { ReplyPayload } from "../auto-reply/reply-payload.js";
import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../auto-reply/thinking.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
@@ -56,6 +57,7 @@ export type SubscribeEmbeddedPiSessionParams = {
data: Record<string, unknown>;
sessionKey?: string;
}) => void | Promise<void>;
onHeartbeatToolResponse?: (response: HeartbeatToolResponse) => void | Promise<void>;
terminalLifecyclePhase?: "end" | "finishing";
/** Best-effort hook invoked immediately before the terminal lifecycle event is emitted. */
onBeforeLifecycleTerminal?: () => void | Promise<void>;

View File

@@ -53,6 +53,24 @@ describe("createHeartbeatResponseTool", () => {
expect(details.summary).toBe("Nothing needs attention.");
});
it("rejects repeated heartbeat responses from the same tool instance", async () => {
const tool = createHeartbeatResponseTool();
await tool.execute("call-1", {
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
});
await expect(
tool.execute("call-2", {
outcome: "no_change",
notify: false,
summary: "Nothing needs attention.",
}),
).rejects.toThrow("heartbeat_respond already recorded");
});
it("accepts notification text and optional scheduling metadata", async () => {
const tool = createHeartbeatResponseTool();

View File

@@ -36,6 +36,7 @@ function readRequiredBoolean(params: Record<string, unknown>, key: string): bool
}
export function createHeartbeatResponseTool(): AnyAgentTool {
let recorded = false;
return {
label: "Heartbeat",
name: HEARTBEAT_RESPONSE_TOOL_NAME,
@@ -54,6 +55,10 @@ export function createHeartbeatResponseTool(): AnyAgentTool {
"Invalid heartbeat response. Provide outcome, notify, and non-empty summary.",
);
}
if (recorded) {
throw new ToolInputError("heartbeat_respond already recorded for this turn");
}
recorded = true;
return jsonResult({
status: "recorded",
...response,