From 57f9f0a08dd72511919f37a960596861ba2c00ba Mon Sep 17 00:00:00 2001 From: David Date: Mon, 6 Apr 2026 18:56:38 +0800 Subject: [PATCH] fix: stop heartbeat transcript truncation races (#60998) (thanks @nxmxbbd) --- CHANGELOG.md | 2 + src/agents/pi-embedded-runner/compact.ts | 7 + src/agents/pi-embedded-runner/run/attempt.ts | 18 ++- .../pi-embedded-runner/session-truncation.ts | 25 ++++ src/auto-reply/heartbeat-filter.test.ts | 141 ++++++++++++++++++ src/auto-reply/heartbeat-filter.ts | 96 ++++++++++++ .../heartbeat-runner.transcript-prune.test.ts | 24 ++- src/infra/heartbeat-runner.ts | 73 +-------- 8 files changed, 301 insertions(+), 85 deletions(-) create mode 100644 src/auto-reply/heartbeat-filter.test.ts create mode 100644 src/auto-reply/heartbeat-filter.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 24c56d177e1..5a00f1dd3aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ Docs: https://docs.openclaw.ai - Providers/Anthropic: skip `service_tier` injection for OAuth-authenticated stream wrapper requests so Claude OAuth requests stop failing with HTTP 401. (#60356) thanks @openperf. - Agents/exec: preserve explicit `host=node` routing under elevated defaults when `tools.exec.host=auto`, and fail loud on invalid elevated cross-host overrides. (#61739) Thanks @obviyus. - Docs/i18n: remove the zh-CN homepage redirect override so Mintlify can resolve the localized Chinese homepage without self-redirecting `/zh-CN/index`. +- Agents/heartbeat: stop truncating live session transcripts after no-op heartbeat acks, move heartbeat cleanup to prompt assembly and compaction, and keep post-filter context-engine ingestion aligned with the real session baseline. (#60998) Thanks @nxmxbbd. + ## 2026.4.5 ### Breaking diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 429aba0164a..b768b33b007 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -15,6 +15,7 @@ import { ensureContextEnginesInitialized, resolveContextEngine, } from "../../context-engine/index.js"; +import { resolveHeartbeatSummaryForAgent } from "../../infra/heartbeat-summary.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; @@ -1004,8 +1005,14 @@ export async function compactEmbeddedPiSessionDirect( // Truncate session file to remove compacted entries (#39953) if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) { try { + const heartbeatSummary = resolveHeartbeatSummaryForAgent( + params.config, + sessionAgentId, + ); const truncResult = await truncateSessionAfterCompaction({ sessionFile: params.sessionFile, + ackMaxChars: heartbeatSummary.ackMaxChars, + heartbeatPrompt: heartbeatSummary.prompt, }); if (truncResult.truncated) { log.info( diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 274775e47eb..ebae9edc5bf 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -6,8 +6,10 @@ import { DefaultResourceLoader, SessionManager, } from "@mariozechner/pi-coding-agent"; +import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js"; import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; +import { resolveHeartbeatSummaryForAgent } from "../../../infra/heartbeat-summary.js"; import { getMachineDisplayName } from "../../../infra/machine-name.js"; import { ensureGlobalUndiciEnvProxyDispatcher, @@ -1511,7 +1513,7 @@ export async function runEmbeddedAttempt( let promptError: unknown = null; let promptErrorSource: "prompt" | "compaction" | null = null; - const prePromptMessageCount = activeSession.messages.length; + let prePromptMessageCount = activeSession.messages.length; try { const promptStartedAt = Date.now(); @@ -1665,6 +1667,20 @@ export async function runEmbeddedAttempt( activeSession.agent.state.messages = activeSession.messages; } + const heartbeatSummary = + params.config && sessionAgentId + ? resolveHeartbeatSummaryForAgent(params.config, sessionAgentId) + : undefined; + const filteredMessages = filterHeartbeatPairs( + activeSession.messages, + heartbeatSummary?.ackMaxChars, + heartbeatSummary?.prompt, + ); + if (filteredMessages.length < activeSession.messages.length) { + activeSession.agent.state.messages = filteredMessages; + } + prePromptMessageCount = activeSession.messages.length; + // Detect and load images referenced in the prompt for vision-capable models. // Images are prompt-local only (pi-like behavior). const imageResult = await detectAndLoadPromptImages({ diff --git a/src/agents/pi-embedded-runner/session-truncation.ts b/src/agents/pi-embedded-runner/session-truncation.ts index 9b87e962672..b3c0be8b941 100644 --- a/src/agents/pi-embedded-runner/session-truncation.ts +++ b/src/agents/pi-embedded-runner/session-truncation.ts @@ -2,6 +2,10 @@ import fs from "node:fs/promises"; import path from "node:path"; import type { CompactionEntry, SessionEntry } from "@mariozechner/pi-coding-agent"; import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { + isHeartbeatOkResponse, + isHeartbeatUserMessage, +} from "../../auto-reply/heartbeat-filter.js"; import { log } from "./logger.js"; /** @@ -34,6 +38,8 @@ export async function truncateSessionAfterCompaction(params: { sessionFile: string; /** Optional path to archive the pre-truncation file. */ archivePath?: string; + ackMaxChars?: number; + heartbeatPrompt?: string; }): Promise { const { sessionFile } = params; @@ -111,6 +117,25 @@ export async function truncateSessionAfterCompaction(params: { } } + for (let i = 0; i < branch.length - 1; i++) { + const userEntry = branch[i]; + const assistantEntry = branch[i + 1]; + if ( + userEntry.type === "message" && + assistantEntry.type === "message" && + summarizedBranchIds.has(userEntry.id) && + summarizedBranchIds.has(assistantEntry.id) && + !removedIds.has(userEntry.id) && + !removedIds.has(assistantEntry.id) && + isHeartbeatUserMessage(userEntry.message, params.heartbeatPrompt) && + isHeartbeatOkResponse(assistantEntry.message, params.ackMaxChars) + ) { + removedIds.add(userEntry.id); + removedIds.add(assistantEntry.id); + i++; + } + } + // Labels bookmark targetId while parentId just records the leaf when the // label was changed, so targetId determines whether the label is still valid. // Branch summaries still hang off the summarized branch via parentId. diff --git a/src/auto-reply/heartbeat-filter.test.ts b/src/auto-reply/heartbeat-filter.test.ts new file mode 100644 index 00000000000..a8faa453701 --- /dev/null +++ b/src/auto-reply/heartbeat-filter.test.ts @@ -0,0 +1,141 @@ +import { describe, expect, it } from "vitest"; +import { + filterHeartbeatPairs, + isHeartbeatOkResponse, + isHeartbeatUserMessage, +} from "./heartbeat-filter.js"; +import { HEARTBEAT_PROMPT } from "./heartbeat.js"; + +describe("isHeartbeatUserMessage", () => { + it("matches heartbeat prompts", () => { + expect( + isHeartbeatUserMessage( + { + role: "user", + content: `${HEARTBEAT_PROMPT}\nWhen reading HEARTBEAT.md, use workspace file /tmp/HEARTBEAT.md (exact case). Do not read docs/heartbeat.md.`, + }, + HEARTBEAT_PROMPT, + ), + ).toBe(true); + + expect( + isHeartbeatUserMessage({ + role: "user", + content: + "Run the following periodic tasks (only those due based on their intervals):\n\n- email-check: Check for urgent unread emails\n\nAfter completing all due tasks, reply HEARTBEAT_OK.", + }), + ).toBe(true); + }); + + it("ignores quoted or non-user token mentions", () => { + expect( + isHeartbeatUserMessage({ + role: "user", + content: "Please reply HEARTBEAT_OK so I can test something.", + }), + ).toBe(false); + + expect( + isHeartbeatUserMessage({ + role: "assistant", + content: "HEARTBEAT_OK", + }), + ).toBe(false); + }); +}); + +describe("isHeartbeatOkResponse", () => { + it("matches no-op heartbeat acknowledgements", () => { + expect( + isHeartbeatOkResponse({ + role: "assistant", + content: "**HEARTBEAT_OK**", + }), + ).toBe(true); + + expect( + isHeartbeatOkResponse({ + role: "assistant", + content: "You have 3 unread urgent emails. HEARTBEAT_OK", + }), + ).toBe(true); + }); + + it("preserves meaningful or non-text responses", () => { + expect( + isHeartbeatOkResponse({ + role: "assistant", + content: "Status HEARTBEAT_OK due to watchdog failure", + }), + ).toBe(false); + + expect( + isHeartbeatOkResponse({ + role: "assistant", + content: [{ type: "tool_use", id: "tool-1", name: "search", input: {} }], + }), + ).toBe(false); + }); + + it("respects ackMaxChars overrides", () => { + expect( + isHeartbeatOkResponse( + { + role: "assistant", + content: "HEARTBEAT_OK all good", + }, + 0, + ), + ).toBe(false); + }); +}); + +describe("filterHeartbeatPairs", () => { + it("removes no-op heartbeat pairs", () => { + const messages = [ + { role: "user", content: "Hello" }, + { role: "assistant", content: "Hi there!" }, + { role: "user", content: HEARTBEAT_PROMPT }, + { role: "assistant", content: "HEARTBEAT_OK" }, + { role: "user", content: "What time is it?" }, + { role: "assistant", content: "It is 3pm." }, + ]; + + expect(filterHeartbeatPairs(messages, undefined, HEARTBEAT_PROMPT)).toEqual([ + { role: "user", content: "Hello" }, + { role: "assistant", content: "Hi there!" }, + { role: "user", content: "What time is it?" }, + { role: "assistant", content: "It is 3pm." }, + ]); + }); + + it("keeps meaningful heartbeat results and non-text assistant turns", () => { + const meaningfulMessages = [ + { role: "user", content: HEARTBEAT_PROMPT }, + { role: "assistant", content: "Status HEARTBEAT_OK due to watchdog failure" }, + ]; + expect(filterHeartbeatPairs(meaningfulMessages, undefined, HEARTBEAT_PROMPT)).toEqual( + meaningfulMessages, + ); + + const nonTextMessages = [ + { role: "user", content: HEARTBEAT_PROMPT }, + { + role: "assistant", + content: [{ type: "tool_use", id: "tool-1", name: "search", input: {} }], + }, + ]; + expect(filterHeartbeatPairs(nonTextMessages, undefined, HEARTBEAT_PROMPT)).toEqual( + nonTextMessages, + ); + }); + + it("keeps ordinary chats that mention the token", () => { + const messages = [ + { role: "user", content: "Please reply HEARTBEAT_OK so I can test something." }, + { role: "assistant", content: "HEARTBEAT_OK" }, + ]; + + expect(filterHeartbeatPairs(messages, undefined, HEARTBEAT_PROMPT)).toEqual(messages); + }); +}); diff --git a/src/auto-reply/heartbeat-filter.ts b/src/auto-reply/heartbeat-filter.ts new file mode 100644 index 00000000000..f7b2eda0267 --- /dev/null +++ b/src/auto-reply/heartbeat-filter.ts @@ -0,0 +1,96 @@ +import { stripHeartbeatToken } from "./heartbeat.js"; + +const HEARTBEAT_TASK_PROMPT_PREFIX = + "Run the following periodic tasks (only those due based on their intervals):"; +const HEARTBEAT_TASK_PROMPT_ACK = "After completing all due tasks, reply HEARTBEAT_OK."; + +function resolveMessageText(content: unknown): { text: string; hasNonTextContent: boolean } { + if (typeof content === "string") { + return { text: content, hasNonTextContent: false }; + } + if (!Array.isArray(content)) { + return { text: "", hasNonTextContent: content != null }; + } + let hasNonTextContent = false; + const text = content + .filter((block): block is { type: "text"; text: string } => { + if (typeof block !== "object" || block === null || !("type" in block)) { + hasNonTextContent = true; + return false; + } + if (block.type !== "text") { + hasNonTextContent = true; + return false; + } + if (typeof (block as { text?: unknown }).text !== "string") { + hasNonTextContent = true; + return false; + } + return true; + }) + .map((block) => block.text) + .join(""); + return { text, hasNonTextContent }; +} + +export function isHeartbeatUserMessage( + message: { role: string; content?: unknown }, + heartbeatPrompt?: string, +): boolean { + if (message.role !== "user") { + return false; + } + const { text } = resolveMessageText(message.content); + const trimmed = text.trim(); + if (!trimmed) { + return false; + } + const normalizedHeartbeatPrompt = heartbeatPrompt?.trim(); + if (normalizedHeartbeatPrompt && trimmed.startsWith(normalizedHeartbeatPrompt)) { + return true; + } + return ( + trimmed.startsWith(HEARTBEAT_TASK_PROMPT_PREFIX) && trimmed.includes(HEARTBEAT_TASK_PROMPT_ACK) + ); +} + +export function isHeartbeatOkResponse( + message: { role: string; content?: unknown }, + ackMaxChars?: number, +): boolean { + if (message.role !== "assistant") { + return false; + } + const { text, hasNonTextContent } = resolveMessageText(message.content); + if (hasNonTextContent) { + return false; + } + return stripHeartbeatToken(text, { mode: "heartbeat", maxAckChars: ackMaxChars }).shouldSkip; +} + +export function filterHeartbeatPairs( + messages: T[], + ackMaxChars?: number, + heartbeatPrompt?: string, +): T[] { + if (messages.length < 2) { + return messages; + } + + const result: T[] = []; + let i = 0; + while (i < messages.length) { + if ( + i + 1 < messages.length && + isHeartbeatUserMessage(messages[i], heartbeatPrompt) && + isHeartbeatOkResponse(messages[i + 1], ackMaxChars) + ) { + i += 2; + continue; + } + result.push(messages[i]); + i++; + } + + return result; +} diff --git a/src/infra/heartbeat-runner.transcript-prune.test.ts b/src/infra/heartbeat-runner.transcript-prune.test.ts index 264bc60e764..7b8eac89c9f 100644 --- a/src/infra/heartbeat-runner.transcript-prune.test.ts +++ b/src/infra/heartbeat-runner.transcript-prune.test.ts @@ -14,7 +14,7 @@ beforeEach(() => { setupTelegramHeartbeatPluginRuntimeForTests(); }); -describe("heartbeat transcript pruning", () => { +describe("heartbeat transcript append-only (#39609)", () => { async function createTranscriptWithContent(transcriptPath: string, sessionId: string) { const header = { type: "session", @@ -40,13 +40,12 @@ describe("heartbeat transcript pruning", () => { cacheWriteTokens: number; }; }; - expectPruned: boolean; }) { await withTempTelegramHeartbeatSandbox( async ({ tmpDir, storePath, replySpy }) => { const sessionKey = resolveMainSessionKey(undefined); const transcriptPath = path.join(tmpDir, `${params.sessionId}.jsonl`); - const originalContent = await createTranscriptWithContent(transcriptPath, params.sessionId); + await createTranscriptWithContent(transcriptPath, params.sessionId); const originalSize = (await fs.stat(transcriptPath)).size; await seedSessionStore(storePath, sessionKey, { @@ -77,37 +76,32 @@ describe("heartbeat transcript pruning", () => { }); const finalSize = (await fs.stat(transcriptPath)).size; - if (params.expectPruned) { - const finalContent = await fs.readFile(transcriptPath, "utf-8"); - expect(finalContent).toBe(originalContent); - expect(finalSize).toBe(originalSize); - return; - } + // Transcript must never be truncated — entries are append-only now. + // HEARTBEAT_OK entries stay in the file and are filtered at context + // build time instead of being removed via fs.truncate (#39609). expect(finalSize).toBeGreaterThanOrEqual(originalSize); }, { prefix: "openclaw-hb-prune-" }, ); } - it("prunes transcript when heartbeat returns HEARTBEAT_OK", async () => { + it("does not truncate transcript when heartbeat returns HEARTBEAT_OK", async () => { await runTranscriptScenario({ - sessionId: "test-session-prune", + sessionId: "test-session-no-prune", reply: { text: "HEARTBEAT_OK", usage: { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0 }, }, - expectPruned: true, }); }); - it("does not prune transcript when heartbeat returns meaningful content", async () => { + it("does not truncate transcript when heartbeat returns meaningful content", async () => { await runTranscriptScenario({ - sessionId: "test-session-no-prune", + sessionId: "test-session-content", reply: { text: "Alert: Something needs your attention!", usage: { inputTokens: 10, outputTokens: 20, cacheReadTokens: 0, cacheWriteTokens: 0 }, }, - expectPruned: false, }); }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 64441b23fa0..8a8638b2e79 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -33,7 +33,7 @@ import { canonicalizeMainSessionAlias, resolveAgentMainSessionKey, } from "../config/sessions/main-session.js"; -import { resolveSessionFilePath, resolveStorePath } from "../config/sessions/paths.js"; +import { resolveStorePath } from "../config/sessions/paths.js"; import { loadSessionStore } from "../config/sessions/store-load.js"; import { saveSessionStore, updateSessionStore } from "../config/sessions/store.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; @@ -300,58 +300,6 @@ async function restoreHeartbeatUpdatedAt(params: { }); } -/** - * Prune heartbeat transcript entries by truncating the file back to a previous size. - * This removes the user+assistant turns that were written during a HEARTBEAT_OK run, - * preventing context pollution from zero-information exchanges. - */ -async function pruneHeartbeatTranscript(params: { - transcriptPath?: string; - preHeartbeatSize?: number; -}) { - const { transcriptPath, preHeartbeatSize } = params; - if (!transcriptPath || typeof preHeartbeatSize !== "number" || preHeartbeatSize < 0) { - return; - } - try { - const stat = await fs.stat(transcriptPath); - // Only truncate if the file has grown during the heartbeat run - if (stat.size > preHeartbeatSize) { - await fs.truncate(transcriptPath, preHeartbeatSize); - } - } catch { - // File may not exist or may have been removed - ignore errors - } -} - -/** - * Get the transcript file path and its current size before a heartbeat run. - * Returns undefined values if the session or transcript doesn't exist yet. - */ -async function captureTranscriptState(params: { - storePath: string; - sessionKey: string; - agentId?: string; -}): Promise<{ transcriptPath?: string; preHeartbeatSize?: number }> { - const { storePath, sessionKey, agentId } = params; - try { - const store = loadSessionStore(storePath); - const entry = store[sessionKey]; - if (!entry?.sessionId) { - return {}; - } - const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, { - agentId, - sessionsDir: path.dirname(storePath), - }); - const stat = await fs.stat(transcriptPath); - return { transcriptPath, preHeartbeatSize: stat.size }; - } catch { - // Session or transcript doesn't exist yet - nothing to prune - return {}; - } -} - function stripLeadingHeartbeatResponsePrefix( text: string, responsePrefix: string | undefined, @@ -715,7 +663,6 @@ export async function runHeartbeatOnce(opts: { } let runSessionKey = sessionKey; - let runStorePath = storePath; if (useIsolatedSession) { const isolatedKey = `${sessionKey}:heartbeat`; const cronSession = resolveCronSession({ @@ -728,7 +675,6 @@ export async function runHeartbeatOnce(opts: { cronSession.store[isolatedKey] = cronSession.sessionEntry; await saveSessionStore(cronSession.storePath, cronSession.store); runSessionKey = isolatedKey; - runStorePath = cronSession.storePath; } // Update task last run times AFTER successful heartbeat completion @@ -822,14 +768,6 @@ export async function runHeartbeatOnce(opts: { }; try { - // Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK. - // For isolated sessions, capture the isolated transcript (not the main session's). - const transcriptState = await captureTranscriptState({ - storePath: runStorePath, - sessionKey: runSessionKey, - agentId, - }); - const heartbeatModelOverride = heartbeat?.model?.trim() || undefined; const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true; const bootstrapContextMode: "lightweight" | undefined = @@ -857,8 +795,7 @@ export async function runHeartbeatOnce(opts: { sessionKey, updatedAt: previousUpdatedAt, }); - // Prune the transcript to remove HEARTBEAT_OK turns - await pruneHeartbeatTranscript(transcriptState); + const okSent = await maybeSendHeartbeatOk(); emitHeartbeatEvent({ status: "ok-empty", @@ -894,8 +831,7 @@ export async function runHeartbeatOnce(opts: { sessionKey, updatedAt: previousUpdatedAt, }); - // Prune the transcript to remove HEARTBEAT_OK turns - await pruneHeartbeatTranscript(transcriptState); + const okSent = await maybeSendHeartbeatOk(); emitHeartbeatEvent({ status: "ok-token", @@ -932,8 +868,7 @@ export async function runHeartbeatOnce(opts: { sessionKey, updatedAt: previousUpdatedAt, }); - // Prune the transcript to remove duplicate heartbeat turns - await pruneHeartbeatTranscript(transcriptState); + emitHeartbeatEvent({ status: "skipped", reason: "duplicate",