fix: stop heartbeat transcript truncation races (#60998) (thanks @nxmxbbd)

This commit is contained in:
David
2026-04-06 18:56:38 +08:00
committed by GitHub
parent 4154bd707a
commit 57f9f0a08d
8 changed files with 301 additions and 85 deletions

View File

@@ -9,6 +9,8 @@ Docs: https://docs.openclaw.ai
- Providers/Anthropic: skip `service_tier` injection for OAuth-authenticated stream wrapper requests so Claude OAuth requests stop failing with HTTP 401. (#60356) thanks @openperf.
- Agents/exec: preserve explicit `host=node` routing under elevated defaults when `tools.exec.host=auto`, and fail loud on invalid elevated cross-host overrides. (#61739) Thanks @obviyus.
- Docs/i18n: remove the zh-CN homepage redirect override so Mintlify can resolve the localized Chinese homepage without self-redirecting `/zh-CN/index`.
- Agents/heartbeat: stop truncating live session transcripts after no-op heartbeat acks, move heartbeat cleanup to prompt assembly and compaction, and keep post-filter context-engine ingestion aligned with the real session baseline. (#60998) Thanks @nxmxbbd.
## 2026.4.5
### Breaking

View File

@@ -15,6 +15,7 @@ import {
ensureContextEnginesInitialized,
resolveContextEngine,
} from "../../context-engine/index.js";
import { resolveHeartbeatSummaryForAgent } from "../../infra/heartbeat-summary.js";
import { getMachineDisplayName } from "../../infra/machine-name.js";
import { generateSecureToken } from "../../infra/secure-random.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
@@ -1004,8 +1005,14 @@ export async function compactEmbeddedPiSessionDirect(
// Truncate session file to remove compacted entries (#39953)
if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) {
try {
const heartbeatSummary = resolveHeartbeatSummaryForAgent(
params.config,
sessionAgentId,
);
const truncResult = await truncateSessionAfterCompaction({
sessionFile: params.sessionFile,
ackMaxChars: heartbeatSummary.ackMaxChars,
heartbeatPrompt: heartbeatSummary.prompt,
});
if (truncResult.truncated) {
log.info(

View File

@@ -6,8 +6,10 @@ import {
DefaultResourceLoader,
SessionManager,
} from "@mariozechner/pi-coding-agent";
import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js";
import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import { resolveHeartbeatSummaryForAgent } from "../../../infra/heartbeat-summary.js";
import { getMachineDisplayName } from "../../../infra/machine-name.js";
import {
ensureGlobalUndiciEnvProxyDispatcher,
@@ -1511,7 +1513,7 @@ export async function runEmbeddedAttempt(
let promptError: unknown = null;
let promptErrorSource: "prompt" | "compaction" | null = null;
const prePromptMessageCount = activeSession.messages.length;
let prePromptMessageCount = activeSession.messages.length;
try {
const promptStartedAt = Date.now();
@@ -1665,6 +1667,20 @@ export async function runEmbeddedAttempt(
activeSession.agent.state.messages = activeSession.messages;
}
const heartbeatSummary =
params.config && sessionAgentId
? resolveHeartbeatSummaryForAgent(params.config, sessionAgentId)
: undefined;
const filteredMessages = filterHeartbeatPairs(
activeSession.messages,
heartbeatSummary?.ackMaxChars,
heartbeatSummary?.prompt,
);
if (filteredMessages.length < activeSession.messages.length) {
activeSession.agent.state.messages = filteredMessages;
}
prePromptMessageCount = activeSession.messages.length;
// Detect and load images referenced in the prompt for vision-capable models.
// Images are prompt-local only (pi-like behavior).
const imageResult = await detectAndLoadPromptImages({

View File

@@ -2,6 +2,10 @@ import fs from "node:fs/promises";
import path from "node:path";
import type { CompactionEntry, SessionEntry } from "@mariozechner/pi-coding-agent";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
isHeartbeatOkResponse,
isHeartbeatUserMessage,
} from "../../auto-reply/heartbeat-filter.js";
import { log } from "./logger.js";
/**
@@ -34,6 +38,8 @@ export async function truncateSessionAfterCompaction(params: {
sessionFile: string;
/** Optional path to archive the pre-truncation file. */
archivePath?: string;
ackMaxChars?: number;
heartbeatPrompt?: string;
}): Promise<TruncationResult> {
const { sessionFile } = params;
@@ -111,6 +117,25 @@ export async function truncateSessionAfterCompaction(params: {
}
}
for (let i = 0; i < branch.length - 1; i++) {
const userEntry = branch[i];
const assistantEntry = branch[i + 1];
if (
userEntry.type === "message" &&
assistantEntry.type === "message" &&
summarizedBranchIds.has(userEntry.id) &&
summarizedBranchIds.has(assistantEntry.id) &&
!removedIds.has(userEntry.id) &&
!removedIds.has(assistantEntry.id) &&
isHeartbeatUserMessage(userEntry.message, params.heartbeatPrompt) &&
isHeartbeatOkResponse(assistantEntry.message, params.ackMaxChars)
) {
removedIds.add(userEntry.id);
removedIds.add(assistantEntry.id);
i++;
}
}
// Labels bookmark targetId while parentId just records the leaf when the
// label was changed, so targetId determines whether the label is still valid.
// Branch summaries still hang off the summarized branch via parentId.

View File

@@ -0,0 +1,141 @@
import { describe, expect, it } from "vitest";
import {
filterHeartbeatPairs,
isHeartbeatOkResponse,
isHeartbeatUserMessage,
} from "./heartbeat-filter.js";
import { HEARTBEAT_PROMPT } from "./heartbeat.js";
describe("isHeartbeatUserMessage", () => {
it("matches heartbeat prompts", () => {
expect(
isHeartbeatUserMessage(
{
role: "user",
content: `${HEARTBEAT_PROMPT}\nWhen reading HEARTBEAT.md, use workspace file /tmp/HEARTBEAT.md (exact case). Do not read docs/heartbeat.md.`,
},
HEARTBEAT_PROMPT,
),
).toBe(true);
expect(
isHeartbeatUserMessage({
role: "user",
content:
"Run the following periodic tasks (only those due based on their intervals):\n\n- email-check: Check for urgent unread emails\n\nAfter completing all due tasks, reply HEARTBEAT_OK.",
}),
).toBe(true);
});
it("ignores quoted or non-user token mentions", () => {
expect(
isHeartbeatUserMessage({
role: "user",
content: "Please reply HEARTBEAT_OK so I can test something.",
}),
).toBe(false);
expect(
isHeartbeatUserMessage({
role: "assistant",
content: "HEARTBEAT_OK",
}),
).toBe(false);
});
});
describe("isHeartbeatOkResponse", () => {
it("matches no-op heartbeat acknowledgements", () => {
expect(
isHeartbeatOkResponse({
role: "assistant",
content: "**HEARTBEAT_OK**",
}),
).toBe(true);
expect(
isHeartbeatOkResponse({
role: "assistant",
content: "You have 3 unread urgent emails. HEARTBEAT_OK",
}),
).toBe(true);
});
it("preserves meaningful or non-text responses", () => {
expect(
isHeartbeatOkResponse({
role: "assistant",
content: "Status HEARTBEAT_OK due to watchdog failure",
}),
).toBe(false);
expect(
isHeartbeatOkResponse({
role: "assistant",
content: [{ type: "tool_use", id: "tool-1", name: "search", input: {} }],
}),
).toBe(false);
});
it("respects ackMaxChars overrides", () => {
expect(
isHeartbeatOkResponse(
{
role: "assistant",
content: "HEARTBEAT_OK all good",
},
0,
),
).toBe(false);
});
});
describe("filterHeartbeatPairs", () => {
it("removes no-op heartbeat pairs", () => {
const messages = [
{ role: "user", content: "Hello" },
{ role: "assistant", content: "Hi there!" },
{ role: "user", content: HEARTBEAT_PROMPT },
{ role: "assistant", content: "HEARTBEAT_OK" },
{ role: "user", content: "What time is it?" },
{ role: "assistant", content: "It is 3pm." },
];
expect(filterHeartbeatPairs(messages, undefined, HEARTBEAT_PROMPT)).toEqual([
{ role: "user", content: "Hello" },
{ role: "assistant", content: "Hi there!" },
{ role: "user", content: "What time is it?" },
{ role: "assistant", content: "It is 3pm." },
]);
});
it("keeps meaningful heartbeat results and non-text assistant turns", () => {
const meaningfulMessages = [
{ role: "user", content: HEARTBEAT_PROMPT },
{ role: "assistant", content: "Status HEARTBEAT_OK due to watchdog failure" },
];
expect(filterHeartbeatPairs(meaningfulMessages, undefined, HEARTBEAT_PROMPT)).toEqual(
meaningfulMessages,
);
const nonTextMessages = [
{ role: "user", content: HEARTBEAT_PROMPT },
{
role: "assistant",
content: [{ type: "tool_use", id: "tool-1", name: "search", input: {} }],
},
];
expect(filterHeartbeatPairs(nonTextMessages, undefined, HEARTBEAT_PROMPT)).toEqual(
nonTextMessages,
);
});
it("keeps ordinary chats that mention the token", () => {
const messages = [
{ role: "user", content: "Please reply HEARTBEAT_OK so I can test something." },
{ role: "assistant", content: "HEARTBEAT_OK" },
];
expect(filterHeartbeatPairs(messages, undefined, HEARTBEAT_PROMPT)).toEqual(messages);
});
});

View File

@@ -0,0 +1,96 @@
import { stripHeartbeatToken } from "./heartbeat.js";
const HEARTBEAT_TASK_PROMPT_PREFIX =
"Run the following periodic tasks (only those due based on their intervals):";
const HEARTBEAT_TASK_PROMPT_ACK = "After completing all due tasks, reply HEARTBEAT_OK.";
function resolveMessageText(content: unknown): { text: string; hasNonTextContent: boolean } {
if (typeof content === "string") {
return { text: content, hasNonTextContent: false };
}
if (!Array.isArray(content)) {
return { text: "", hasNonTextContent: content != null };
}
let hasNonTextContent = false;
const text = content
.filter((block): block is { type: "text"; text: string } => {
if (typeof block !== "object" || block === null || !("type" in block)) {
hasNonTextContent = true;
return false;
}
if (block.type !== "text") {
hasNonTextContent = true;
return false;
}
if (typeof (block as { text?: unknown }).text !== "string") {
hasNonTextContent = true;
return false;
}
return true;
})
.map((block) => block.text)
.join("");
return { text, hasNonTextContent };
}
export function isHeartbeatUserMessage(
message: { role: string; content?: unknown },
heartbeatPrompt?: string,
): boolean {
if (message.role !== "user") {
return false;
}
const { text } = resolveMessageText(message.content);
const trimmed = text.trim();
if (!trimmed) {
return false;
}
const normalizedHeartbeatPrompt = heartbeatPrompt?.trim();
if (normalizedHeartbeatPrompt && trimmed.startsWith(normalizedHeartbeatPrompt)) {
return true;
}
return (
trimmed.startsWith(HEARTBEAT_TASK_PROMPT_PREFIX) && trimmed.includes(HEARTBEAT_TASK_PROMPT_ACK)
);
}
export function isHeartbeatOkResponse(
message: { role: string; content?: unknown },
ackMaxChars?: number,
): boolean {
if (message.role !== "assistant") {
return false;
}
const { text, hasNonTextContent } = resolveMessageText(message.content);
if (hasNonTextContent) {
return false;
}
return stripHeartbeatToken(text, { mode: "heartbeat", maxAckChars: ackMaxChars }).shouldSkip;
}
export function filterHeartbeatPairs<T extends { role: string; content?: unknown }>(
messages: T[],
ackMaxChars?: number,
heartbeatPrompt?: string,
): T[] {
if (messages.length < 2) {
return messages;
}
const result: T[] = [];
let i = 0;
while (i < messages.length) {
if (
i + 1 < messages.length &&
isHeartbeatUserMessage(messages[i], heartbeatPrompt) &&
isHeartbeatOkResponse(messages[i + 1], ackMaxChars)
) {
i += 2;
continue;
}
result.push(messages[i]);
i++;
}
return result;
}

View File

@@ -14,7 +14,7 @@ beforeEach(() => {
setupTelegramHeartbeatPluginRuntimeForTests();
});
describe("heartbeat transcript pruning", () => {
describe("heartbeat transcript append-only (#39609)", () => {
async function createTranscriptWithContent(transcriptPath: string, sessionId: string) {
const header = {
type: "session",
@@ -40,13 +40,12 @@ describe("heartbeat transcript pruning", () => {
cacheWriteTokens: number;
};
};
expectPruned: boolean;
}) {
await withTempTelegramHeartbeatSandbox(
async ({ tmpDir, storePath, replySpy }) => {
const sessionKey = resolveMainSessionKey(undefined);
const transcriptPath = path.join(tmpDir, `${params.sessionId}.jsonl`);
const originalContent = await createTranscriptWithContent(transcriptPath, params.sessionId);
await createTranscriptWithContent(transcriptPath, params.sessionId);
const originalSize = (await fs.stat(transcriptPath)).size;
await seedSessionStore(storePath, sessionKey, {
@@ -77,37 +76,32 @@ describe("heartbeat transcript pruning", () => {
});
const finalSize = (await fs.stat(transcriptPath)).size;
if (params.expectPruned) {
const finalContent = await fs.readFile(transcriptPath, "utf-8");
expect(finalContent).toBe(originalContent);
expect(finalSize).toBe(originalSize);
return;
}
// Transcript must never be truncated — entries are append-only now.
// HEARTBEAT_OK entries stay in the file and are filtered at context
// build time instead of being removed via fs.truncate (#39609).
expect(finalSize).toBeGreaterThanOrEqual(originalSize);
},
{ prefix: "openclaw-hb-prune-" },
);
}
it("prunes transcript when heartbeat returns HEARTBEAT_OK", async () => {
it("does not truncate transcript when heartbeat returns HEARTBEAT_OK", async () => {
await runTranscriptScenario({
sessionId: "test-session-prune",
sessionId: "test-session-no-prune",
reply: {
text: "HEARTBEAT_OK",
usage: { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0 },
},
expectPruned: true,
});
});
it("does not prune transcript when heartbeat returns meaningful content", async () => {
it("does not truncate transcript when heartbeat returns meaningful content", async () => {
await runTranscriptScenario({
sessionId: "test-session-no-prune",
sessionId: "test-session-content",
reply: {
text: "Alert: Something needs your attention!",
usage: { inputTokens: 10, outputTokens: 20, cacheReadTokens: 0, cacheWriteTokens: 0 },
},
expectPruned: false,
});
});
});

View File

@@ -33,7 +33,7 @@ import {
canonicalizeMainSessionAlias,
resolveAgentMainSessionKey,
} from "../config/sessions/main-session.js";
import { resolveSessionFilePath, resolveStorePath } from "../config/sessions/paths.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { loadSessionStore } from "../config/sessions/store-load.js";
import { saveSessionStore, updateSessionStore } from "../config/sessions/store.js";
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
@@ -300,58 +300,6 @@ async function restoreHeartbeatUpdatedAt(params: {
});
}
/**
* Prune heartbeat transcript entries by truncating the file back to a previous size.
* This removes the user+assistant turns that were written during a HEARTBEAT_OK run,
* preventing context pollution from zero-information exchanges.
*/
async function pruneHeartbeatTranscript(params: {
transcriptPath?: string;
preHeartbeatSize?: number;
}) {
const { transcriptPath, preHeartbeatSize } = params;
if (!transcriptPath || typeof preHeartbeatSize !== "number" || preHeartbeatSize < 0) {
return;
}
try {
const stat = await fs.stat(transcriptPath);
// Only truncate if the file has grown during the heartbeat run
if (stat.size > preHeartbeatSize) {
await fs.truncate(transcriptPath, preHeartbeatSize);
}
} catch {
// File may not exist or may have been removed - ignore errors
}
}
/**
* Get the transcript file path and its current size before a heartbeat run.
* Returns undefined values if the session or transcript doesn't exist yet.
*/
async function captureTranscriptState(params: {
storePath: string;
sessionKey: string;
agentId?: string;
}): Promise<{ transcriptPath?: string; preHeartbeatSize?: number }> {
const { storePath, sessionKey, agentId } = params;
try {
const store = loadSessionStore(storePath);
const entry = store[sessionKey];
if (!entry?.sessionId) {
return {};
}
const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, {
agentId,
sessionsDir: path.dirname(storePath),
});
const stat = await fs.stat(transcriptPath);
return { transcriptPath, preHeartbeatSize: stat.size };
} catch {
// Session or transcript doesn't exist yet - nothing to prune
return {};
}
}
function stripLeadingHeartbeatResponsePrefix(
text: string,
responsePrefix: string | undefined,
@@ -715,7 +663,6 @@ export async function runHeartbeatOnce(opts: {
}
let runSessionKey = sessionKey;
let runStorePath = storePath;
if (useIsolatedSession) {
const isolatedKey = `${sessionKey}:heartbeat`;
const cronSession = resolveCronSession({
@@ -728,7 +675,6 @@ export async function runHeartbeatOnce(opts: {
cronSession.store[isolatedKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
runSessionKey = isolatedKey;
runStorePath = cronSession.storePath;
}
// Update task last run times AFTER successful heartbeat completion
@@ -822,14 +768,6 @@ export async function runHeartbeatOnce(opts: {
};
try {
// Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK.
// For isolated sessions, capture the isolated transcript (not the main session's).
const transcriptState = await captureTranscriptState({
storePath: runStorePath,
sessionKey: runSessionKey,
agentId,
});
const heartbeatModelOverride = heartbeat?.model?.trim() || undefined;
const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true;
const bootstrapContextMode: "lightweight" | undefined =
@@ -857,8 +795,7 @@ export async function runHeartbeatOnce(opts: {
sessionKey,
updatedAt: previousUpdatedAt,
});
// Prune the transcript to remove HEARTBEAT_OK turns
await pruneHeartbeatTranscript(transcriptState);
const okSent = await maybeSendHeartbeatOk();
emitHeartbeatEvent({
status: "ok-empty",
@@ -894,8 +831,7 @@ export async function runHeartbeatOnce(opts: {
sessionKey,
updatedAt: previousUpdatedAt,
});
// Prune the transcript to remove HEARTBEAT_OK turns
await pruneHeartbeatTranscript(transcriptState);
const okSent = await maybeSendHeartbeatOk();
emitHeartbeatEvent({
status: "ok-token",
@@ -932,8 +868,7 @@ export async function runHeartbeatOnce(opts: {
sessionKey,
updatedAt: previousUpdatedAt,
});
// Prune the transcript to remove duplicate heartbeat turns
await pruneHeartbeatTranscript(transcriptState);
emitHeartbeatEvent({
status: "skipped",
reason: "duplicate",