From 3c95327b346a653936a2c187e207954f88d304f4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 01:51:00 +0100 Subject: [PATCH] Fix compacted session transcript rotation --- docs/.generated/config-baseline.sha256 | 6 +- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/concepts/compaction.md | 6 + docs/concepts/context-engine.md | 4 + .../session-management-compaction.md | 4 + .../bash-tools.exec-host-node-phases.ts | 2 +- src/agents/bash-tools.exec-host-node.ts | 29 +- src/agents/bash-tools.exec-host-node.types.ts | 27 ++ .../pi-embedded-runner/compact.hooks.test.ts | 84 ++++ .../pi-embedded-runner/compact.queued.ts | 61 ++- src/agents/pi-embedded-runner/compact.ts | 76 ++-- .../compaction-successor-transcript.test.ts | 177 +++++++++ .../compaction-successor-transcript.ts | 206 ++++++++++ .../run.overflow-compaction.fixture.ts | 6 + .../run.overflow-compaction.harness.ts | 2 + .../run.overflow-compaction.test.ts | 36 ++ .../run.timeout-triggered-compaction.test.ts | 17 +- src/agents/pi-embedded-runner/run.ts | 69 +++- src/agents/pi-embedded-runner/run/attempt.ts | 35 ++ src/agents/pi-embedded-runner/run/types.ts | 1 + .../session-truncation.test.ts | 368 ------------------ .../pi-embedded-runner/session-truncation.ts | 252 ------------ src/agents/pi-embedded-runner/types.ts | 3 + src/auto-reply/reply/agent-runner-memory.ts | 7 + src/auto-reply/reply/agent-runner.ts | 1 + src/auto-reply/reply/commands-compact.ts | 2 + src/auto-reply/reply/followup-runner.ts | 1 + .../reply/session-run-accounting.ts | 2 + src/auto-reply/reply/session-updates.ts | 17 +- src/cli/update-cli.test.ts | 4 +- src/config/schema.base.generated.ts | 8 +- src/config/schema.help.ts | 2 +- src/config/schema.labels.ts | 2 +- src/config/types.agent-defaults.ts | 5 +- src/context-engine/delegate.ts | 2 + src/context-engine/types.ts | 4 + src/gateway/server-methods/sessions.ts | 6 + src/scripts/test-projects.test.ts | 19 +- 38 files changed, 823 insertions(+), 734 deletions(-) create mode 100644 src/agents/bash-tools.exec-host-node.types.ts create mode 100644 src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts create mode 100644 src/agents/pi-embedded-runner/compaction-successor-transcript.ts delete mode 100644 src/agents/pi-embedded-runner/session-truncation.test.ts delete mode 100644 src/agents/pi-embedded-runner/session-truncation.ts diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 2d0e0e2b4d3..9c40ce9f7a8 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -4d1995e41b659e484afb5a48d6fca0558337123200a4a537f556ca38e8e829e7 config-baseline.json -3245c9a013c55ee8a24db52d5e88c42bc86e26f822d4a144fc7f37fc71e05fa8 config-baseline.core.json +79fa6b9b9df5e22ac56a7edb9bfc25550131e285ce9f4868f468d957a8768240 config-baseline.json +2722504ab6bd37eea9e7542689bd6dba5fb4e485c0eab9c1915427c49a5c5b66 config-baseline.core.json 7cd9c908f066c143eab2a201efbc9640f483ab28bba92ddeca1d18cc2b528bc3 config-baseline.channel.json -f9e0174988718959fe1923a54496ec5b9262721fe1e7306f32ccb1316d9d9c3f config-baseline.plugin.json +74b74cb18ac37c0acaa765f398f1f9edbcee4c43567f02d45c89598a1e13afb4 config-baseline.plugin.json diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index cd7c3e48449..5d6211c3505 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -21914ef8c5840e0defc36d571834dc28a92d6d5ca2d42a088c33b4de681e836a plugin-sdk-api-baseline.json -3f22e6af0dad3433d25d996802d7436a3cc0e68bc86ecaf813a22e2b4e5333eb plugin-sdk-api-baseline.jsonl +ba5191d586958233c69921928e4d13ae6e8af61e26cf57eec6f50c5d551d8b43 plugin-sdk-api-baseline.json +e6fc8ea33cfc6251a080c3a49d0db2e7d82c117f412902c79da359ebbc9197cc plugin-sdk-api-baseline.jsonl diff --git a/docs/concepts/compaction.md b/docs/concepts/compaction.md index 5a1f3dab24a..95e0b851795 100644 --- a/docs/concepts/compaction.md +++ b/docs/concepts/compaction.md @@ -118,6 +118,12 @@ honors that Pi cut-point and keeps the recent tail in rebuilt context. Without an explicit keep budget, manual compaction behaves as a hard checkpoint and continues from the new summary alone. +When `agents.defaults.compaction.truncateAfterCompaction` is enabled, +OpenClaw does not rewrite the existing transcript in place. It creates a new +active successor transcript from the compaction summary, preserved state, and +unsummarized tail, then keeps the previous JSONL as the archived checkpoint +source. + ## Using a different model By default, compaction uses your agent's primary model. You can use a more diff --git a/docs/concepts/context-engine.md b/docs/concepts/context-engine.md index 6426b2b3417..74eb2512f62 100644 --- a/docs/concepts/context-engine.md +++ b/docs/concepts/context-engine.md @@ -194,6 +194,10 @@ Required members: Prepended to the system prompt. +`compact` returns a `CompactResult`. When compaction rotates the active +transcript, `result.sessionId` and `result.sessionFile` identify the successor +session that the next retry or turn must use. + Optional members: | Member | Kind | Purpose | diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 37a7b65181c..aced6697325 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -285,6 +285,10 @@ OpenClaw also enforces a safety floor for embedded runs: and keeps Pi's recent-tail cut point. Without an explicit keep budget, manual compaction remains a hard checkpoint and rebuilt context starts from the new summary. +- When `agents.defaults.compaction.truncateAfterCompaction` is enabled, + OpenClaw rotates the active transcript to a compacted successor JSONL after + compaction. The old full transcript remains archived and linked from the + compaction checkpoint instead of being rewritten in place. Why: leave enough headroom for multi-turn “housekeeping” (like memory writes) before compaction becomes unavoidable. diff --git a/src/agents/bash-tools.exec-host-node-phases.ts b/src/agents/bash-tools.exec-host-node-phases.ts index 04a83f843b7..6adadacb4b7 100644 --- a/src/agents/bash-tools.exec-host-node-phases.ts +++ b/src/agents/bash-tools.exec-host-node-phases.ts @@ -17,7 +17,7 @@ import { buildNodeShellCommand } from "../infra/node-shell.js"; import { parsePreparedSystemRunPayload } from "../infra/system-run-approval-context.js"; import { formatExecCommand, resolveSystemRunCommandRequest } from "../infra/system-run-command.js"; import { normalizeNullableString } from "../shared/string-coerce.js"; -import type { ExecuteNodeHostCommandParams } from "./bash-tools.exec-host-node.js"; +import type { ExecuteNodeHostCommandParams } from "./bash-tools.exec-host-node.types.js"; import type { ExecToolDetails } from "./bash-tools.exec-types.js"; import { callGatewayTool } from "./tools/gateway.js"; import { listNodes, resolveNodeIdFromList } from "./tools/nodes-utils.js"; diff --git a/src/agents/bash-tools.exec-host-node.ts b/src/agents/bash-tools.exec-host-node.ts index 426ca296ebd..9d216bea328 100644 --- a/src/agents/bash-tools.exec-host-node.ts +++ b/src/agents/bash-tools.exec-host-node.ts @@ -1,7 +1,5 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core"; import { - type ExecAsk, - type ExecSecurity, requiresExecApproval, resolveExecApprovalAllowedDecisions, } from "../infra/exec-approvals.js"; @@ -19,6 +17,7 @@ import { resolveNodeExecutionTarget, shouldSkipNodeApprovalPrepare, } from "./bash-tools.exec-host-node-phases.js"; +import type { ExecuteNodeHostCommandParams } from "./bash-tools.exec-host-node.types.js"; import * as execHostShared from "./bash-tools.exec-host-shared.js"; import { DEFAULT_NOTIFY_TAIL_CHARS, @@ -28,31 +27,7 @@ import { import type { ExecToolDetails } from "./bash-tools.exec-types.js"; import { callGatewayTool } from "./tools/gateway.js"; -export type ExecuteNodeHostCommandParams = { - command: string; - workdir: string | undefined; - env: Record; - requestedEnv?: Record; - requestedNode?: string; - boundNode?: string; - sessionKey?: string; - turnSourceChannel?: string; - turnSourceTo?: string; - turnSourceAccountId?: string; - turnSourceThreadId?: string | number; - trigger?: string; - agentId?: string; - security: ExecSecurity; - ask: ExecAsk; - strictInlineEval?: boolean; - timeoutSec?: number; - defaultTimeoutSec: number; - approvalRunningNoticeMs: number; - warnings: string[]; - notifySessionKey?: string; - notifyOnExit?: boolean; - trustedSafeBinDirs?: ReadonlySet; -}; +export type { ExecuteNodeHostCommandParams } from "./bash-tools.exec-host-node.types.js"; export async function executeNodeHostCommand( params: ExecuteNodeHostCommandParams, diff --git a/src/agents/bash-tools.exec-host-node.types.ts b/src/agents/bash-tools.exec-host-node.types.ts new file mode 100644 index 00000000000..1b212200ad5 --- /dev/null +++ b/src/agents/bash-tools.exec-host-node.types.ts @@ -0,0 +1,27 @@ +import type { ExecAsk, ExecSecurity } from "../infra/exec-approvals.js"; + +export type ExecuteNodeHostCommandParams = { + command: string; + workdir: string | undefined; + env: Record; + requestedEnv?: Record; + requestedNode?: string; + boundNode?: string; + sessionKey?: string; + turnSourceChannel?: string; + turnSourceTo?: string; + turnSourceAccountId?: string; + turnSourceThreadId?: string | number; + trigger?: string; + agentId?: string; + security: ExecSecurity; + ask: ExecAsk; + strictInlineEval?: boolean; + timeoutSec?: number; + defaultTimeoutSec: number; + approvalRunningNoticeMs: number; + warnings: string[]; + notifySessionKey?: string; + notifyOnExit?: boolean; + trustedSafeBinDirs?: ReadonlySet; +}; diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index fa4e0fcd57a..4795b8c5ab8 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -752,6 +752,38 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { ); }); + it("passes the rotated session id to engine-owned after_compaction hooks", async () => { + hookRunner.hasHooks.mockReturnValue(true); + const rotatedSessionId = "rotated-session"; + const rotatedSessionFile = "/tmp/rotated-session.jsonl"; + contextEngineCompactMock.mockResolvedValue({ + ok: true, + compacted: true, + reason: undefined, + result: { + summary: "engine-summary", + firstKeptEntryId: "entry-1", + tokensBefore: 120, + tokensAfter: 50, + sessionId: rotatedSessionId, + sessionFile: rotatedSessionFile, + }, + } as never); + + const result = await compactEmbeddedPiSession(wrappedCompactionArgs()); + + expect(result.ok).toBe(true); + expect(hookRunner.runAfterCompaction).toHaveBeenCalledWith( + expect.objectContaining({ + sessionFile: rotatedSessionFile, + }), + expect.objectContaining({ + sessionId: rotatedSessionId, + sessionKey: TEST_SESSION_KEY, + }), + ); + }); + it("emits a transcript update and post-compaction memory sync on the engine-owned path", async () => { const listener = vi.fn(); const cleanup = onSessionTranscriptUpdate(listener); @@ -924,6 +956,58 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { } }); + it("reuses a delegated compaction successor transcript", async () => { + const maintain = vi.fn(async (_params?: unknown) => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + const delegatedSessionId = "delegated-session"; + const delegatedSessionFile = "/tmp/delegated-session.jsonl"; + resolveContextEngineMock.mockResolvedValue({ + info: { ownsCompaction: false }, + compact: contextEngineCompactMock, + maintain, + } as never); + contextEngineCompactMock.mockResolvedValue({ + ok: true, + compacted: true, + reason: undefined, + result: { + summary: "engine-summary", + firstKeptEntryId: "entry-1", + tokensBefore: 120, + tokensAfter: 50, + sessionId: delegatedSessionId, + sessionFile: delegatedSessionFile, + }, + } as never); + + const result = await compactEmbeddedPiSession( + wrappedCompactionArgs({ + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + }, + }, + }, + }, + }), + ); + + expect(result.ok).toBe(true); + expect(result.result?.sessionId).toBe(delegatedSessionId); + expect(result.result?.sessionFile).toBe(delegatedSessionFile); + expect(maintain).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: delegatedSessionId, + sessionFile: delegatedSessionFile, + }), + ); + }); + it("catches and logs hook exceptions without aborting compaction", async () => { hookRunner.hasHooks.mockReturnValue(true); hookRunner.runBeforeCompaction.mockRejectedValue(new Error("hook boom")); diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index cdca8b0b503..1efc1984d7c 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -26,6 +26,10 @@ import { buildEmbeddedCompactionRuntimeContext, resolveEmbeddedCompactionTarget, } from "./compaction-runtime-context.js"; +import { + rotateTranscriptAfterCompaction, + shouldRotateCompactionTranscript, +} from "./compaction-successor-transcript.js"; import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; @@ -158,15 +162,44 @@ export async function compactEmbeddedPiSession( force: params.trigger === "manual", runtimeContext, }); + const delegatedSessionId = result.result?.sessionId; + const delegatedSessionFile = result.result?.sessionFile; + const delegatedRotatedTranscript = Boolean(delegatedSessionId || delegatedSessionFile); + let postCompactionSessionId = delegatedSessionId ?? params.sessionId; + let postCompactionSessionFile = delegatedSessionFile ?? params.sessionFile; + let postCompactionLeafId: string | undefined; if (result.ok && result.compacted) { + if (shouldRotateCompactionTranscript(params.config) && !delegatedRotatedTranscript) { + try { + const rotation = await rotateTranscriptAfterCompaction({ + sessionManager: SessionManager.open(params.sessionFile), + sessionFile: params.sessionFile, + }); + if (rotation.rotated) { + postCompactionSessionId = rotation.sessionId ?? postCompactionSessionId; + postCompactionSessionFile = rotation.sessionFile ?? postCompactionSessionFile; + postCompactionLeafId = rotation.leafId; + log.info( + `[compaction] rotated active transcript after context-engine compaction ` + + `(sessionKey=${params.sessionKey ?? params.sessionId})`, + ); + } + } catch (err) { + log.warn("failed to rotate compacted transcript", { + errorMessage: formatErrorMessage(err), + }); + } + } if (params.config && params.sessionKey && checkpointSnapshot) { try { - const postCompactionSession = SessionManager.open(params.sessionFile); - const postLeafId = postCompactionSession.getLeafId() ?? undefined; + const postLeafId = + postCompactionLeafId ?? + SessionManager.open(postCompactionSessionFile).getLeafId() ?? + undefined; const storedCheckpoint = await persistSessionCompactionCheckpoint({ cfg: params.config, sessionKey: params.sessionKey, - sessionId: params.sessionId, + sessionId: postCompactionSessionId, reason: resolveSessionCompactionCheckpointReason({ trigger: params.trigger, }), @@ -175,7 +208,7 @@ export async function compactEmbeddedPiSession( firstKeptEntryId: result.result?.firstKeptEntryId, tokensBefore: result.result?.tokensBefore, tokensAfter: result.result?.tokensAfter, - postSessionFile: params.sessionFile, + postSessionFile: postCompactionSessionFile, postLeafId, postEntryId: postLeafId, }); @@ -188,9 +221,9 @@ export async function compactEmbeddedPiSession( } await runContextEngineMaintenance({ contextEngine, - sessionId: params.sessionId, + sessionId: postCompactionSessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile: postCompactionSessionFile, reason: "compaction", runtimeContext, }); @@ -199,7 +232,7 @@ export async function compactEmbeddedPiSession( await runPostCompactionSideEffects({ config: params.config, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile: postCompactionSessionFile, }); } if ( @@ -209,14 +242,18 @@ export async function compactEmbeddedPiSession( hookRunner.runAfterCompaction ) { try { + const afterHookCtx = { + ...hookCtx, + sessionId: postCompactionSessionId, + }; await hookRunner.runAfterCompaction( { messageCount: -1, compactedCount: -1, tokenCount: result.result?.tokensAfter, - sessionFile: params.sessionFile, + sessionFile: postCompactionSessionFile, }, - hookCtx, + afterHookCtx, ); } catch (err) { log.warn("after_compaction hook failed", { @@ -235,6 +272,12 @@ export async function compactEmbeddedPiSession( tokensBefore: result.result.tokensBefore, tokensAfter: result.result.tokensAfter, details: result.result.details, + ...(postCompactionSessionId !== params.sessionId + ? { sessionId: postCompactionSessionId } + : {}), + ...(postCompactionSessionFile !== params.sessionFile + ? { sessionFile: postCompactionSessionFile } + : {}), } : undefined, }; diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 43688dda31c..085b3fe9b81 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -19,7 +19,6 @@ import { type CapturedCompactionCheckpointSnapshot, } from "../../gateway/session-compaction-checkpoints.js"; import { formatErrorMessage } from "../../infra/errors.js"; -import { resolveHeartbeatSummaryForAgent } from "../../infra/heartbeat-summary.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; @@ -113,6 +112,11 @@ import { compactWithSafetyTimeout, resolveCompactionTimeoutMs, } from "./compaction-safety-timeout.js"; +import { + type CompactionTranscriptRotation, + rotateTranscriptAfterCompaction, + shouldRotateCompactionTranscript, +} from "./compaction-successor-transcript.js"; import { applyFinalEffectiveToolPolicy } from "./effective-tool-policy.js"; import { buildEmbeddedExtensionFactories } from "./extensions.js"; import { applyExtraParamsToAgent } from "./extra-params.js"; @@ -126,7 +130,6 @@ import { sanitizeSessionHistory, validateReplayTurns } from "./replay-history.js import { shouldUseOpenAIWebSocketTransport } from "./run/attempt.thread-helpers.js"; import { buildEmbeddedSandboxInfo } from "./sandbox-info.js"; import { prewarmSessionFile, trackSessionManagerAccess } from "./session-manager-cache.js"; -import { truncateSessionAfterCompaction } from "./session-truncation.js"; import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js"; import { resolveEmbeddedAgentApiKey, @@ -1080,6 +1083,7 @@ export async function compactEmbeddedPiSessionDirect( typeof sessionManager.getLeafId === "function" ? (sessionManager.getLeafId() ?? undefined) : undefined; + let transcriptRotationSessionManager = sessionManager; if (params.trigger === "manual") { try { const hardenedBoundary = await hardenManualCompactionBoundary({ @@ -1092,6 +1096,7 @@ export async function compactEmbeddedPiSessionDirect( hardenedBoundary.firstKeptEntryId ?? effectiveFirstKeptEntryId; postCompactionLeafId = hardenedBoundary.leafId ?? postCompactionLeafId; session.agent.state.messages = hardenedBoundary.messages; + transcriptRotationSessionManager = SessionManager.open(params.sessionFile); } } catch (err) { log.warn("[compaction] failed to harden manual compaction boundary", { @@ -1108,12 +1113,40 @@ export async function compactEmbeddedPiSessionDirect( }); const messageCountAfter = session.messages.length; const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter); + let transcriptRotation: CompactionTranscriptRotation = { rotated: false }; + if (shouldRotateCompactionTranscript(params.config)) { + try { + transcriptRotation = await rotateTranscriptAfterCompaction({ + sessionManager: transcriptRotationSessionManager, + sessionFile: params.sessionFile, + }); + } catch (err) { + log.warn("[compaction] post-compaction transcript rotation failed", { + errorMessage: formatErrorMessage(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + } + const activeSessionId = transcriptRotation.sessionId ?? params.sessionId; + const activeSessionFile = transcriptRotation.sessionFile ?? params.sessionFile; + const activePostLeafId = transcriptRotation.leafId ?? postCompactionLeafId; + if (transcriptRotation.rotated) { + log.info( + `[compaction] rotated active transcript after compaction ` + + `(sessionKey=${params.sessionKey ?? params.sessionId})`, + ); + await runPostCompactionSideEffects({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: activeSessionFile, + }); + } if (params.config && params.sessionKey && checkpointSnapshot) { try { const storedCheckpoint = await persistSessionCompactionCheckpoint({ cfg: params.config, sessionKey: params.sessionKey, - sessionId: params.sessionId, + sessionId: activeSessionId, reason: resolveSessionCompactionCheckpointReason({ trigger: params.trigger, }), @@ -1122,9 +1155,9 @@ export async function compactEmbeddedPiSessionDirect( firstKeptEntryId: effectiveFirstKeptEntryId, tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, - postSessionFile: params.sessionFile, - postLeafId: postCompactionLeafId, - postEntryId: postCompactionLeafId, + postSessionFile: activeSessionFile, + postLeafId: activePostLeafId, + postEntryId: activePostLeafId, createdAt: compactStartedAt, }); checkpointSnapshotRetained = storedCheckpoint !== null; @@ -1153,7 +1186,7 @@ export async function compactEmbeddedPiSessionDirect( } await runAfterCompactionHooks({ hookRunner, - sessionId: params.sessionId, + sessionId: activeSessionId, sessionAgentId, hookSessionKey, missingSessionKey, @@ -1162,36 +1195,11 @@ export async function compactEmbeddedPiSessionDirect( messageCountAfter, tokensAfter, compactedCount, - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, tokensBefore: result.tokensBefore, firstKeptEntryId: effectiveFirstKeptEntryId, }); - // Truncate session file to remove compacted entries (#39953) - if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) { - try { - const heartbeatSummary = resolveHeartbeatSummaryForAgent( - params.config, - sessionAgentId, - ); - const truncResult = await truncateSessionAfterCompaction({ - sessionFile: params.sessionFile, - ackMaxChars: heartbeatSummary.ackMaxChars, - heartbeatPrompt: heartbeatSummary.prompt, - }); - if (truncResult.truncated) { - log.info( - `[compaction] post-compaction truncation removed ${truncResult.entriesRemoved} entries ` + - `(sessionKey=${params.sessionKey ?? params.sessionId})`, - ); - } - } catch (err) { - log.warn("[compaction] post-compaction truncation failed", { - errorMessage: formatErrorMessage(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - } return { ok: true, compacted: true, @@ -1201,6 +1209,8 @@ export async function compactEmbeddedPiSessionDirect( tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, details: result.details, + sessionId: transcriptRotation.sessionId, + sessionFile: transcriptRotation.sessionFile, }, }; } catch (err) { diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts new file mode 100644 index 00000000000..c0f5f7e1f9a --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts @@ -0,0 +1,177 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { afterEach, describe, expect, it } from "vitest"; +import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; +import { + rotateTranscriptAfterCompaction, + shouldRotateCompactionTranscript, +} from "./compaction-successor-transcript.js"; +import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js"; + +let tmpDir: string | undefined; + +async function createTmpDir(): Promise { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "compaction-successor-test-")); + return tmpDir; +} + +afterEach(async () => { + if (tmpDir) { + await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => undefined); + tmpDir = undefined; + } +}); + +function makeAssistant(text: string, timestamp: number) { + return makeAgentAssistantMessage({ + content: [{ type: "text", text }], + timestamp, + }); +} + +function createCompactedSession(sessionDir: string): { + manager: SessionManager; + sessionFile: string; + firstKeptId: string; + oldUserId: string; +} { + const manager = SessionManager.create(sessionDir, sessionDir); + manager.appendModelChange("openai", "gpt-5.2"); + manager.appendThinkingLevelChange("medium"); + manager.appendCustomEntry("test-extension", { cursor: "before-compaction" }); + const oldUserId = manager.appendMessage({ role: "user", content: "old user", timestamp: 1 }); + manager.appendLabelChange(oldUserId, "old bookmark"); + manager.appendMessage(makeAssistant("old assistant", 2)); + const firstKeptId = manager.appendMessage({ role: "user", content: "kept user", timestamp: 3 }); + manager.appendLabelChange(firstKeptId, "kept bookmark"); + manager.appendMessage(makeAssistant("kept assistant", 4)); + manager.appendCompaction("Summary of old user and old assistant.", firstKeptId, 5000); + manager.appendMessage({ role: "user", content: "post user", timestamp: 5 }); + manager.appendMessage(makeAssistant("post assistant", 6)); + return { manager, sessionFile: manager.getSessionFile()!, firstKeptId, oldUserId }; +} + +describe("rotateTranscriptAfterCompaction", () => { + it("creates a compacted successor transcript and leaves the archive untouched", async () => { + const dir = await createTmpDir(); + const { manager, sessionFile, firstKeptId, oldUserId } = createCompactedSession(dir); + const originalBytes = await fs.readFile(sessionFile, "utf8"); + const originalEntryCount = manager.getEntries().length; + + const result = await rotateTranscriptAfterCompaction({ + sessionManager: manager, + sessionFile, + now: () => new Date("2026-04-27T12:00:00.000Z"), + }); + + expect(result.rotated).toBe(true); + expect(result.sessionId).toBeTruthy(); + expect(result.sessionFile).toBeTruthy(); + expect(result.sessionFile).not.toBe(sessionFile); + expect(await fs.readFile(sessionFile, "utf8")).toBe(originalBytes); + + const successor = SessionManager.open(result.sessionFile!); + expect(successor.getHeader()).toMatchObject({ + id: result.sessionId, + parentSession: sessionFile, + cwd: dir, + }); + expect(successor.getEntries().length).toBeLessThan(originalEntryCount); + expect(successor.getBranch()[0]?.type).toBe("model_change"); + expect(successor.getBranch()).toContainEqual( + expect.objectContaining({ + type: "custom", + customType: "test-extension", + data: { cursor: "before-compaction" }, + }), + ); + + const context = successor.buildSessionContext(); + const contextText = JSON.stringify(context.messages); + expect(contextText).toContain("Summary of old user and old assistant."); + expect(contextText).toContain("kept user"); + expect(contextText).toContain("post assistant"); + expect( + context.messages.some((message) => message.role === "user" && message.content === "old user"), + ).toBe(false); + expect(context.model?.provider).toBe("openai"); + expect(context.thinkingLevel).toBe("medium"); + expect(successor.getLabel(firstKeptId)).toBe("kept bookmark"); + expect(successor.getLabel(oldUserId)).toBeUndefined(); + }); + + it("skips sessions with no compaction entry", async () => { + const dir = await createTmpDir(); + const manager = SessionManager.create(dir, dir); + manager.appendMessage({ role: "user", content: "hello", timestamp: 1 }); + manager.appendMessage(makeAssistant("hi", 2)); + + const result = await rotateTranscriptAfterCompaction({ + sessionManager: manager, + sessionFile: manager.getSessionFile()!, + }); + + expect(result).toMatchObject({ + rotated: false, + reason: "no compaction entry", + }); + }); + + it("uses a refreshed manager after manual boundary hardening", async () => { + const dir = await createTmpDir(); + const manager = SessionManager.create(dir, dir); + manager.appendMessage({ role: "user", content: "old question", timestamp: 1 }); + manager.appendMessage(makeAssistant("old answer", 2)); + const recentTailId = manager.appendMessage({ + role: "user", + content: "recent question", + timestamp: 3, + }); + manager.appendMessage(makeAssistant("detailed recent answer", 4)); + const compactionId = manager.appendCompaction("fresh manual summary", recentTailId, 200); + const sessionFile = manager.getSessionFile(); + expect(sessionFile).toBeTruthy(); + const staleManager = SessionManager.open(sessionFile!); + + const hardened = await hardenManualCompactionBoundary({ sessionFile: sessionFile! }); + expect(hardened.applied).toBe(true); + const staleLeaf = staleManager.getLeafEntry(); + expect(staleLeaf?.type).toBe("compaction"); + if (!staleLeaf || staleLeaf.type !== "compaction") { + throw new Error("expected stale leaf to be a compaction entry"); + } + expect(staleLeaf.firstKeptEntryId).toBe(recentTailId); + + const result = await rotateTranscriptAfterCompaction({ + sessionManager: SessionManager.open(sessionFile!), + sessionFile: sessionFile!, + now: () => new Date("2026-04-27T12:30:00.000Z"), + }); + + expect(result.rotated).toBe(true); + const successor = SessionManager.open(result.sessionFile!); + const successorText = JSON.stringify(successor.buildSessionContext().messages); + expect(successorText).toContain("fresh manual summary"); + expect(successorText).not.toContain("recent question"); + expect(successorText).not.toContain("detailed recent answer"); + const successorCompaction = successor + .getEntries() + .find((entry) => entry.type === "compaction" && entry.id === compactionId); + expect(successorCompaction).toMatchObject({ + firstKeptEntryId: compactionId, + }); + }); +}); + +describe("shouldRotateCompactionTranscript", () => { + it("keeps transcript rotation opt-in behind the existing config key", () => { + expect(shouldRotateCompactionTranscript()).toBe(false); + expect( + shouldRotateCompactionTranscript({ + agents: { defaults: { compaction: { truncateAfterCompaction: true } } }, + }), + ).toBe(true); + }); +}); diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts new file mode 100644 index 00000000000..ce701c0b16a --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts @@ -0,0 +1,206 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { + CURRENT_SESSION_VERSION, + SessionManager, + type CompactionEntry, + type SessionEntry, + type SessionHeader, +} from "@mariozechner/pi-coding-agent"; +import type { OpenClawConfig } from "../../config/types.openclaw.js"; + +type ReadonlySessionManagerForRotation = Pick< + SessionManager, + "buildSessionContext" | "getBranch" | "getCwd" | "getHeader" +>; + +export type CompactionTranscriptRotation = { + rotated: boolean; + reason?: string; + sessionId?: string; + sessionFile?: string; + compactionEntryId?: string; + leafId?: string; + entriesWritten?: number; +}; + +export function shouldRotateCompactionTranscript(config?: OpenClawConfig): boolean { + return config?.agents?.defaults?.compaction?.truncateAfterCompaction === true; +} + +export async function rotateTranscriptAfterCompaction(params: { + sessionManager: ReadonlySessionManagerForRotation; + sessionFile: string; + now?: () => Date; +}): Promise { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return { rotated: false, reason: "missing session file" }; + } + + const branch = params.sessionManager.getBranch(); + const latestCompactionIndex = findLatestCompactionIndex(branch); + if (latestCompactionIndex < 0) { + return { rotated: false, reason: "no compaction entry" }; + } + + const compaction = branch[latestCompactionIndex] as CompactionEntry; + const timestamp = (params.now?.() ?? new Date()).toISOString(); + const sessionId = randomUUID(); + const successorFile = resolveSuccessorSessionFile({ + sessionFile, + sessionId, + timestamp, + }); + const successorEntries = buildSuccessorEntries({ + branch, + latestCompactionIndex, + }); + if (successorEntries.length === 0) { + return { rotated: false, reason: "empty successor transcript" }; + } + + const header = buildSuccessorHeader({ + previousHeader: params.sessionManager.getHeader(), + sessionId, + timestamp, + cwd: params.sessionManager.getCwd(), + parentSession: sessionFile, + }); + await writeSessionFileAtomic(successorFile, [header, ...successorEntries]); + + try { + SessionManager.open(successorFile).buildSessionContext(); + } catch (err) { + await fs.unlink(successorFile).catch(() => undefined); + throw err; + } + + return { + rotated: true, + sessionId, + sessionFile: successorFile, + compactionEntryId: compaction.id, + leafId: successorEntries[successorEntries.length - 1]?.id, + entriesWritten: successorEntries.length, + }; +} + +function findLatestCompactionIndex(entries: SessionEntry[]): number { + for (let index = entries.length - 1; index >= 0; index -= 1) { + if (entries[index]?.type === "compaction") { + return index; + } + } + return -1; +} + +function buildSuccessorEntries(params: { + branch: SessionEntry[]; + latestCompactionIndex: number; +}): SessionEntry[] { + const { branch, latestCompactionIndex } = params; + const compaction = branch[latestCompactionIndex] as CompactionEntry; + const firstKeptIndex = branch.findIndex((entry) => entry.id === compaction.firstKeptEntryId); + const keptBeforeCompaction = + firstKeptIndex >= 0 && firstKeptIndex < latestCompactionIndex + ? branch.slice(firstKeptIndex, latestCompactionIndex) + : []; + const afterCompaction = branch.slice(latestCompactionIndex + 1); + const statePrefix = collectLatestStatePrefix(branch.slice(0, latestCompactionIndex)); + const successorEntries: SessionEntry[] = []; + const seenIds = new Set(); + let parentId: string | null = null; + + const append = (entry: SessionEntry) => { + if (seenIds.has(entry.id)) { + return; + } + const nextEntry = { ...entry, parentId } as SessionEntry; + successorEntries.push(nextEntry); + seenIds.add(nextEntry.id); + parentId = nextEntry.id; + }; + + for (const entry of statePrefix) { + append(entry); + } + append(compaction); + for (const entry of [...keptBeforeCompaction, ...afterCompaction]) { + if (entry.type === "compaction" || entry.type === "label") { + continue; + } + append(entry); + } + const retainedIds = new Set(successorEntries.map((entry) => entry.id)); + for (const entry of branch) { + if (entry.type !== "label" || !retainedIds.has(entry.targetId)) { + continue; + } + append(entry); + } + return successorEntries; +} + +function collectLatestStatePrefix(entries: SessionEntry[]): SessionEntry[] { + const customEntries: Array<{ index: number; entry: SessionEntry }> = []; + const latestByType = new Map(); + for (const [index, entry] of entries.entries()) { + if (entry.type === "custom") { + customEntries.push({ index, entry }); + } else if ( + entry.type === "thinking_level_change" || + entry.type === "model_change" || + entry.type === "session_info" + ) { + latestByType.set(entry.type, { index, entry }); + } + } + return [...customEntries, ...latestByType.values()] + .toSorted((left, right) => left.index - right.index) + .map(({ entry }) => entry); +} + +function buildSuccessorHeader(params: { + previousHeader: SessionHeader | null; + sessionId: string; + timestamp: string; + cwd: string; + parentSession: string; +}): SessionHeader { + return { + type: "session", + version: CURRENT_SESSION_VERSION, + id: params.sessionId, + timestamp: params.timestamp, + cwd: params.previousHeader?.cwd || params.cwd, + parentSession: params.parentSession, + }; +} + +function resolveSuccessorSessionFile(params: { + sessionFile: string; + sessionId: string; + timestamp: string; +}): string { + const fileTimestamp = params.timestamp.replace(/[:.]/g, "-"); + return path.join(path.dirname(params.sessionFile), `${fileTimestamp}_${params.sessionId}.jsonl`); +} + +async function writeSessionFileAtomic( + filePath: string, + entries: Array, +) { + const dir = path.dirname(filePath); + await fs.mkdir(dir, { recursive: true }); + const tmpFile = path.join(dir, `.${path.basename(filePath)}.${process.pid}.${randomUUID()}.tmp`); + const content = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`; + try { + await fs.writeFile(tmpFile, content, { encoding: "utf8", flag: "wx" }); + await fs.rename(tmpFile, filePath); + } catch (err) { + await fs.unlink(tmpFile).catch(() => undefined); + throw err; + } +} diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts index dbebd4004f9..da7e077ff67 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts @@ -13,6 +13,8 @@ export function makeCompactionSuccess(params: { firstKeptEntryId?: string; tokensBefore?: number; tokensAfter?: number; + sessionId?: string; + sessionFile?: string; }) { return { ok: true as const, @@ -22,6 +24,8 @@ export function makeCompactionSuccess(params: { ...(params.firstKeptEntryId ? { firstKeptEntryId: params.firstKeptEntryId } : {}), ...(params.tokensBefore !== undefined ? { tokensBefore: params.tokensBefore } : {}), ...(params.tokensAfter !== undefined ? { tokensAfter: params.tokensAfter } : {}), + ...(params.sessionId !== undefined ? { sessionId: params.sessionId } : {}), + ...(params.sessionFile !== undefined ? { sessionFile: params.sessionFile } : {}), }, }; } @@ -83,6 +87,8 @@ type MockCompactDirect = { firstKeptEntryId?: string; tokensBefore?: number; tokensAfter?: number; + sessionId?: string; + sessionFile?: string; }; }) => unknown; }; diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts index 9d3c20ff7ba..61342d7a320 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts @@ -22,6 +22,8 @@ type MockCompactionResult = firstKeptEntryId?: string; tokensBefore?: number; tokensAfter?: number; + sessionId?: string; + sessionFile?: string; }; reason?: string; } diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index ae88872fde4..013de551084 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -611,6 +611,42 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { ); }); + it("retries overflow recovery against the rotated compacted transcript", async () => { + mockedRunEmbeddedAttempt + .mockResolvedValueOnce(makeAttemptResult({ promptError: makeOverflowError() })) + .mockResolvedValueOnce( + makeAttemptResult({ + promptError: null, + sessionIdUsed: "rotated-session", + sessionFileUsed: "/tmp/rotated-session.json", + }), + ); + mockedCompactDirect.mockResolvedValueOnce( + makeCompactionSuccess({ + summary: "rotated overflow compaction", + tokensAfter: 50, + sessionId: "rotated-session", + sessionFile: "/tmp/rotated-session.json", + }), + ); + + await runEmbeddedPiAgent(overflowBaseRunParams); + + expect(mockedRunEmbeddedAttempt).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + sessionId: "rotated-session", + sessionFile: "/tmp/rotated-session.json", + }), + ); + expect(mockedRunContextEngineMaintenance).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "rotated-session", + sessionFile: "/tmp/rotated-session.json", + }), + ); + }); + it("guards thrown engine-owned overflow compaction attempts", async () => { mockedContextEngine.info.ownsCompaction = true; mockedGlobalHookRunner.hasHooks.mockImplementation( diff --git a/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts b/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts index 31b5ad859c5..ac3c20a9c56 100644 --- a/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.timeout-triggered-compaction.test.ts @@ -118,15 +118,30 @@ describe("timeout-triggered compaction", () => { summary: "compacted for timeout", tokensBefore: 160000, tokensAfter: 60000, + sessionId: "timeout-rotated-session", + sessionFile: "/tmp/timeout-rotated-session.json", }), ); // Second attempt succeeds - mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + mockedRunEmbeddedAttempt.mockResolvedValueOnce( + makeAttemptResult({ + promptError: null, + sessionIdUsed: "timeout-rotated-session", + sessionFileUsed: "/tmp/timeout-rotated-session.json", + }), + ); const result = await runEmbeddedPiAgent(overflowBaseRunParams); // Verify the loop continued (retry happened) expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2); + expect(mockedRunEmbeddedAttempt).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + sessionId: "timeout-rotated-session", + sessionFile: "/tmp/timeout-rotated-session.json", + }), + ); expect(mockedRunPostCompactionSideEffects).not.toHaveBeenCalled(); expect(result.meta.error).toBeUndefined(); }); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 9677e5907e5..4aede7500ee 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -706,6 +706,24 @@ export async function runEmbeddedPiAgent( ensureContextEnginesInitialized(); const contextEngine = await resolveContextEngine(params.config); try { + let activeSessionId = params.sessionId; + let activeSessionFile = params.sessionFile; + const resolveActiveHookContext = () => ({ + ...hookCtx, + sessionId: activeSessionId, + }); + const adoptCompactionTranscript = ( + compactResult: Awaited>, + ) => { + const nextSessionId = compactResult.result?.sessionId; + const nextSessionFile = compactResult.result?.sessionFile; + if (nextSessionId && nextSessionId !== activeSessionId) { + activeSessionId = nextSessionId; + } + if (nextSessionFile && nextSessionFile !== activeSessionFile) { + activeSessionFile = nextSessionFile; + } + }; // When the engine owns compaction, compactEmbeddedPiSessionDirect is // bypassed. Fire lifecycle hooks here so recovery paths still notify // subscribers like memory extensions and usage trackers. @@ -718,8 +736,8 @@ export async function runEmbeddedPiAgent( } try { await hookRunner.runBeforeCompaction( - { messageCount: -1, sessionFile: params.sessionFile }, - hookCtx, + { messageCount: -1, sessionFile: activeSessionFile }, + resolveActiveHookContext(), ); } catch (hookErr) { log.warn(`before_compaction hook failed during ${reason}: ${String(hookErr)}`); @@ -743,9 +761,9 @@ export async function runEmbeddedPiAgent( messageCount: -1, compactedCount: -1, tokenCount: compactResult.result?.tokensAfter, - sessionFile: params.sessionFile, + sessionFile: compactResult.result?.sessionFile ?? activeSessionFile, }, - hookCtx, + resolveActiveHookContext(), ); } catch (hookErr) { log.warn(`after_compaction hook failed during ${reason}: ${String(hookErr)}`); @@ -778,7 +796,7 @@ export async function runEmbeddedPiAgent( profileId: lastProfileId, durationMs: Date.now() - started, agentMeta: buildErrorAgentMeta({ - sessionId: params.sessionId, + sessionId: activeSessionId, provider, model: model.id, contextTokens: ctxInfo.tokens, @@ -836,7 +854,7 @@ export async function runEmbeddedPiAgent( }); const attempt = await runEmbeddedAttemptWithBackend({ - sessionId: params.sessionId, + sessionId: activeSessionId, sessionKey: resolvedSessionKey, sandboxSessionKey: params.sandboxSessionKey, trigger: params.trigger, @@ -862,7 +880,7 @@ export async function runEmbeddedPiAgent( currentMessageId: params.currentMessageId, replyToMode: params.replyToMode, hasRepliedRef: params.hasRepliedRef, - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, workspaceDir: resolvedWorkspace, agentDir, config: params.config, @@ -951,9 +969,16 @@ export async function runEmbeddedPiAgent( idleTimedOut, timedOutDuringCompaction, sessionIdUsed, + sessionFileUsed, lastAssistant: sessionLastAssistant, currentAttemptAssistant, } = attempt; + if (sessionIdUsed && sessionIdUsed !== activeSessionId) { + activeSessionId = sessionIdUsed; + } + if (sessionFileUsed && sessionFileUsed !== activeSessionFile) { + activeSessionFile = sessionFileUsed; + } bootstrapPromptWarningSignaturesSeen = attempt.bootstrapPromptWarningSignaturesSeen ?? (attempt.bootstrapPromptWarningSignature @@ -1096,9 +1121,9 @@ export async function runEmbeddedPiAgent( maxAttempts: MAX_TIMEOUT_COMPACTION_ATTEMPTS, }; timeoutCompactResult = await contextEngine.compact({ - sessionId: params.sessionId, + sessionId: activeSessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, tokenBudget: ctxInfo.tokens, force: true, compactionTarget: "budget", @@ -1114,6 +1139,9 @@ export async function runEmbeddedPiAgent( reason: String(compactErr), }; } + if (timeoutCompactResult.compacted) { + adoptCompactionTranscript(timeoutCompactResult); + } await runOwnsCompactionAfterHook("timeout recovery", timeoutCompactResult); if (timeoutCompactResult.compacted) { autoCompactionCount += 1; @@ -1121,7 +1149,7 @@ export async function runEmbeddedPiAgent( await runPostCompactionSideEffects({ config: params.config, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, }); } log.info( @@ -1165,7 +1193,7 @@ export async function runEmbeddedPiAgent( log.warn( `[context-overflow-diag] sessionKey=${params.sessionKey ?? params.sessionId} ` + `provider=${provider}/${modelId} source=${contextOverflowError.source} ` + - `messages=${msgCount} sessionFile=${params.sessionFile} ` + + `messages=${msgCount} sessionFile=${activeSessionFile} ` + `diagId=${overflowDiagId} compactionAttempts=${overflowCompactionAttempts} ` + `observedTokens=${observedOverflowTokens ?? "unknown"} ` + `error=${errorText.slice(0, 200)}`, @@ -1241,9 +1269,9 @@ export async function runEmbeddedPiAgent( maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, }; compactResult = await contextEngine.compact({ - sessionId: params.sessionId, + sessionId: activeSessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, tokenBudget: ctxInfo.tokens, ...(observedOverflowTokens !== undefined ? { currentTokenCount: observedOverflowTokens } @@ -1253,11 +1281,12 @@ export async function runEmbeddedPiAgent( runtimeContext: overflowCompactionRuntimeContext, }); if (compactResult.ok && compactResult.compacted) { + adoptCompactionTranscript(compactResult); await runContextEngineMaintenance({ contextEngine, - sessionId: params.sessionId, + sessionId: activeSessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, reason: "compaction", runtimeContext: overflowCompactionRuntimeContext, }); @@ -1274,16 +1303,17 @@ export async function runEmbeddedPiAgent( } await runOwnsCompactionAfterHook("overflow recovery", compactResult); if (compactResult.compacted) { + adoptCompactionTranscript(compactResult); if (preflightRecovery?.route === "compact_then_truncate") { const truncResult = await truncateOversizedToolResultsInSession({ - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, contextWindowTokens: ctxInfo.tokens, maxCharsOverride: resolveLiveToolResultMaxChars({ contextWindowTokens: ctxInfo.tokens, cfg: params.config, agentId: sessionAgentId, }), - sessionId: params.sessionId, + sessionId: activeSessionId, sessionKey: params.sessionKey, }); if (truncResult.truncated) { @@ -1328,10 +1358,10 @@ export async function runEmbeddedPiAgent( `(contextWindow=${contextWindowTokens} tokens)`, ); const truncResult = await truncateOversizedToolResultsInSession({ - sessionFile: params.sessionFile, + sessionFile: activeSessionFile, contextWindowTokens, maxCharsOverride: toolResultMaxChars, - sessionId: params.sessionId, + sessionId: activeSessionId, sessionKey: params.sessionKey, }); if (truncResult.truncated) { @@ -1782,6 +1812,7 @@ export async function runEmbeddedPiAgent( }); const agentMeta: EmbeddedPiAgentMeta = { sessionId: sessionIdUsed, + sessionFile: sessionFileUsed, provider: sessionLastAssistant?.provider ?? provider, model: sessionLastAssistant?.model ?? model.id, contextTokens: ctxInfo.tokens, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index e05a9571e70..f7ad939e648 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -232,6 +232,10 @@ import { shouldStripBootstrapFromEmbeddedContext, } from "./attempt-bootstrap-routing.js"; export { shouldStripBootstrapFromEmbeddedContext } from "./attempt-bootstrap-routing.js"; +import { + rotateTranscriptAfterCompaction, + shouldRotateCompactionTranscript, +} from "../compaction-successor-transcript.js"; import { configureEmbeddedAttemptHttpRuntime } from "./attempt-http-runtime.js"; import { assembleAttemptContextEngine, @@ -2170,6 +2174,7 @@ export async function runEmbeddedAttempt( let messagesSnapshot: AgentMessage[] = []; let sessionIdUsed = activeSession.sessionId; + let sessionFileUsed: string | undefined = params.sessionFile; const onAbort = () => { externalAbort = true; const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; @@ -2904,6 +2909,35 @@ export async function runEmbeddedAttempt( } } + if ( + compactionOccurredThisAttempt && + !promptError && + !aborted && + !timedOut && + !idleTimedOut && + !timedOutDuringCompaction && + shouldRotateCompactionTranscript(params.config) + ) { + try { + const rotation = await rotateTranscriptAfterCompaction({ + sessionManager, + sessionFile: params.sessionFile, + }); + if (rotation.rotated) { + sessionIdUsed = rotation.sessionId ?? sessionIdUsed; + sessionFileUsed = rotation.sessionFile ?? sessionFileUsed; + log.info( + `[compaction] rotated active transcript after automatic compaction ` + + `(sessionKey=${params.sessionKey ?? params.sessionId})`, + ); + } + } catch (err) { + log.warn("[compaction] automatic transcript rotation failed", { + errorMessage: formatErrorMessage(err), + }); + } + } + cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, note: timedOutDuringCompaction @@ -3127,6 +3161,7 @@ export async function runEmbeddedAttempt( promptErrorSource, preflightRecovery, sessionIdUsed, + sessionFileUsed, diagnosticTrace, bootstrapPromptWarningSignaturesSeen: bootstrapPromptWarning.warningSignaturesSeen, bootstrapPromptWarningSignature: bootstrapPromptWarning.signature, diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 29700bf7c73..5f948bb524a 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -76,6 +76,7 @@ export type EmbeddedRunAttemptResult = { handled?: false; }; sessionIdUsed: string; + sessionFileUsed?: string; diagnosticTrace?: DiagnosticTraceContext; agentHarnessId?: string; agentHarnessResultClassification?: "empty" | "reasoning-only" | "planning-only"; diff --git a/src/agents/pi-embedded-runner/session-truncation.test.ts b/src/agents/pi-embedded-runner/session-truncation.test.ts deleted file mode 100644 index 1eddf723b65..00000000000 --- a/src/agents/pi-embedded-runner/session-truncation.test.ts +++ /dev/null @@ -1,368 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { afterEach, describe, expect, it } from "vitest"; -import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; -import { truncateSessionAfterCompaction } from "./session-truncation.js"; - -let tmpDir: string; - -async function createTmpDir(): Promise { - tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "session-truncation-test-")); - return tmpDir; -} - -afterEach(async () => { - if (tmpDir) { - await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {}); - } -}); - -function makeAssistant(text: string, timestamp: number) { - return makeAgentAssistantMessage({ - content: [{ type: "text", text }], - timestamp, - }); -} - -function createSessionWithCompaction(sessionDir: string): string { - const sm = SessionManager.create(sessionDir, sessionDir); - // Add messages before compaction - sm.appendMessage({ role: "user", content: "hello", timestamp: 1 }); - sm.appendMessage(makeAssistant("hi there", 2)); - sm.appendMessage({ role: "user", content: "do something", timestamp: 3 }); - sm.appendMessage(makeAssistant("done", 4)); - - // Add compaction (summarizing the above) - const branch = sm.getBranch(); - const firstKeptId = branch[branch.length - 1].id; - sm.appendCompaction("Summary of conversation so far.", firstKeptId, 5000); - - // Add messages after compaction - sm.appendMessage({ role: "user", content: "next task", timestamp: 5 }); - sm.appendMessage(makeAssistant("working on it", 6)); - - return sm.getSessionFile()!; -} - -describe("truncateSessionAfterCompaction", () => { - it("removes entries before compaction and keeps entries after (#39953)", async () => { - const dir = await createTmpDir(); - const sessionFile = createSessionWithCompaction(dir); - - // Verify pre-truncation state - const smBefore = SessionManager.open(sessionFile); - const entriesBefore = smBefore.getEntries().length; - expect(entriesBefore).toBeGreaterThan(5); // 4 messages + compaction + 2 messages - - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(true); - expect(result.entriesRemoved).toBeGreaterThan(0); - expect(result.bytesAfter).toBeLessThan(result.bytesBefore!); - - // Verify post-truncation: file is still a valid session - const smAfter = SessionManager.open(sessionFile); - const entriesAfter = smAfter.getEntries().length; - expect(entriesAfter).toBeLessThan(entriesBefore); - - // The branch should contain the firstKeptEntryId message (unsummarized - // tail), compaction, and post-compaction messages - const branchAfter = smAfter.getBranch(); - // The firstKeptEntryId message is preserved as the new root - expect(branchAfter[0].type).toBe("message"); - expect(branchAfter[0].parentId).toBeNull(); - expect(branchAfter[1].type).toBe("compaction"); - - // Session context should still work - const ctx = smAfter.buildSessionContext(); - expect(ctx.messages.length).toBeGreaterThan(0); - }); - - it("skips truncation when no compaction entry exists", async () => { - const dir = await createTmpDir(); - const sm = SessionManager.create(dir, dir); - // appendMessage implicitly creates the session file - sm.appendMessage({ role: "user", content: "hello", timestamp: 1 }); - sm.appendMessage(makeAssistant("hi", 2)); - sm.appendMessage({ role: "user", content: "bye", timestamp: 3 }); - const sessionFile = sm.getSessionFile()!; - - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(false); - expect(result.reason).toBe("no compaction entry found"); - }); - - it("is idempotent — second truncation is a no-op", async () => { - const dir = await createTmpDir(); - const sessionFile = createSessionWithCompaction(dir); - - const first = await truncateSessionAfterCompaction({ sessionFile }); - expect(first.truncated).toBe(true); - - // Run again — no message entries left to remove - const second = await truncateSessionAfterCompaction({ sessionFile }); - expect(second.truncated).toBe(false); - }); - - it("archives original file when archivePath is provided (#39953)", async () => { - const dir = await createTmpDir(); - const sessionFile = createSessionWithCompaction(dir); - const archivePath = path.join(dir, "archive", "backup.jsonl"); - - const result = await truncateSessionAfterCompaction({ sessionFile, archivePath }); - - expect(result.truncated).toBe(true); - const archiveExists = await fs - .stat(archivePath) - .then(() => true) - .catch(() => false); - expect(archiveExists).toBe(true); - - // Archive should be larger than truncated file (it has the full history) - const archiveSize = (await fs.stat(archivePath)).size; - const truncatedSize = (await fs.stat(sessionFile)).size; - expect(archiveSize).toBeGreaterThan(truncatedSize); - }); - - it("handles multiple compaction cycles (#39953)", async () => { - const dir = await createTmpDir(); - const sm = SessionManager.create(dir, dir); - - // First cycle: messages + compaction - sm.appendMessage({ role: "user", content: "cycle 1 message 1", timestamp: 1 }); - sm.appendMessage(makeAssistant("response 1", 2)); - const branch1 = sm.getBranch(); - sm.appendCompaction("Summary of cycle 1.", branch1[branch1.length - 1].id, 3000); - - // Second cycle: more messages + another compaction - sm.appendMessage({ role: "user", content: "cycle 2 message 1", timestamp: 3 }); - sm.appendMessage(makeAssistant("response 2", 4)); - const branch2 = sm.getBranch(); - sm.appendCompaction("Summary of cycles 1 and 2.", branch2[branch2.length - 1].id, 6000); - - // Post-compaction messages - sm.appendMessage({ role: "user", content: "final question", timestamp: 5 }); - - const sessionFile = sm.getSessionFile()!; - const entriesBefore = sm.getEntries().length; - - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(true); - - // Should preserve both compactions (older compactions are non-message state) - // but remove the summarized message entries - const smAfter = SessionManager.open(sessionFile); - const branchAfter = smAfter.getBranch(); - expect(branchAfter[0].type).toBe("compaction"); - - // Both compaction entries are preserved (non-message state is kept) - const compactionEntries = branchAfter.filter((e) => e.type === "compaction"); - expect(compactionEntries).toHaveLength(2); - - // But message entries before the latest compaction were removed - const entriesAfter = smAfter.getEntries().length; - expect(entriesAfter).toBeLessThan(entriesBefore); - - // Only the firstKeptEntryId message should remain before the latest compaction - const latestCompIdx = branchAfter.findIndex( - (e) => e.type === "compaction" && e === compactionEntries[compactionEntries.length - 1], - ); - const messagesBeforeLatest = branchAfter - .slice(0, latestCompIdx) - .filter((e) => e.type === "message"); - expect(messagesBeforeLatest).toHaveLength(1); - }); - - it("preserves non-message session state during truncation", async () => { - const dir = await createTmpDir(); - const sm = SessionManager.create(dir, dir); - - // Messages before compaction - sm.appendMessage({ role: "user", content: "hello", timestamp: 1 }); - sm.appendMessage(makeAssistant("hi", 2)); - - // Non-message state entries interleaved with messages - sm.appendModelChange("anthropic", "claude-sonnet-4-5-20250514"); - sm.appendThinkingLevelChange("high"); - sm.appendCustomEntry("my-extension", { key: "value" }); - sm.appendSessionInfo("my session"); - - sm.appendMessage({ role: "user", content: "do task", timestamp: 3 }); - sm.appendMessage(makeAssistant("done", 4)); - - // Compaction summarizing the conversation - const branch = sm.getBranch(); - const firstKeptId = branch[branch.length - 1].id; - sm.appendCompaction("Summary.", firstKeptId, 5000); - - // Post-compaction messages - sm.appendMessage({ role: "user", content: "next", timestamp: 5 }); - - const sessionFile = sm.getSessionFile()!; - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(true); - - // Verify non-message entries are preserved - const smAfter = SessionManager.open(sessionFile); - const allAfter = smAfter.getEntries(); - const types = allAfter.map((e) => e.type); - - expect(types).toContain("model_change"); - expect(types).toContain("thinking_level_change"); - expect(types).toContain("custom"); - expect(types).toContain("session_info"); - expect(types).toContain("compaction"); - - // Only the firstKeptEntryId message should remain before the compaction - // (all other messages before it were summarized and removed) - const branchAfter = smAfter.getBranch(); - const compIdx = branchAfter.findIndex((e) => e.type === "compaction"); - const msgsBefore = branchAfter.slice(0, compIdx).filter((e) => e.type === "message"); - expect(msgsBefore).toHaveLength(1); - - // Session context should still work - const ctx = smAfter.buildSessionContext(); - expect(ctx.messages.length).toBeGreaterThan(0); - // Non-message state entries are preserved in the truncated file - expect(ctx.model).toBeDefined(); - expect(ctx.thinkingLevel).toBe("high"); - }); - - it("drops label entries whose target message was truncated", async () => { - const dir = await createTmpDir(); - const sm = SessionManager.create(dir, dir); - - // Messages before compaction - sm.appendMessage({ role: "user", content: "hello", timestamp: 1 }); - sm.appendMessage(makeAssistant("hi", 2)); - sm.appendMessage({ role: "user", content: "do task", timestamp: 3 }); - sm.appendMessage(makeAssistant("done", 4)); - - // Capture a pre-compaction message that will be summarized away. - const branch = sm.getBranch(); - const preCompactionMsgId = branch[1].id; // "hi" message - - // Compaction summarizing the conversation - const firstKeptId = branch[branch.length - 1].id; - sm.appendCompaction("Summary.", firstKeptId, 5000); - - // Post-compaction messages - sm.appendMessage({ role: "user", content: "next", timestamp: 5 }); - sm.appendLabelChange(preCompactionMsgId, "my-label"); - - const sessionFile = sm.getSessionFile()!; - const labelEntry = sm.getEntries().find((entry) => entry.type === "label"); - expect(labelEntry?.parentId).not.toBe(preCompactionMsgId); - - const smBefore = SessionManager.open(sessionFile); - expect(smBefore.getLabel(preCompactionMsgId)).toBe("my-label"); - - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(true); - - // Verify label metadata was dropped with the removed target message. - const smAfter = SessionManager.open(sessionFile); - const allAfter = smAfter.getEntries(); - const labels = allAfter.filter((e) => e.type === "label"); - expect(labels).toHaveLength(0); - expect(smAfter.getLabel(preCompactionMsgId)).toBeUndefined(); - }); - - it("preserves the firstKeptEntryId unsummarized tail", async () => { - const dir = await createTmpDir(); - const sm = SessionManager.create(dir, dir); - - // Build a conversation where firstKeptEntryId is NOT the last message - sm.appendMessage({ role: "user", content: "msg1", timestamp: 1 }); - sm.appendMessage(makeAssistant("resp1", 2)); - sm.appendMessage({ role: "user", content: "msg2", timestamp: 3 }); - sm.appendMessage(makeAssistant("resp2", 4)); - - const branch = sm.getBranch(); - // Set firstKeptEntryId to the second message — so msg1 is summarized - // but msg2, resp2, and everything after are the unsummarized tail. - const firstKeptId = branch[1].id; // "resp1" - sm.appendCompaction("Summary of msg1.", firstKeptId, 2000); - - sm.appendMessage({ role: "user", content: "next", timestamp: 5 }); - - const sessionFile = sm.getSessionFile()!; - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(true); - // Only msg1 was summarized (1 entry removed) - expect(result.entriesRemoved).toBe(1); - - // Verify the unsummarized tail is preserved - const smAfter = SessionManager.open(sessionFile); - const branchAfter = smAfter.getBranch(); - const types = branchAfter.map((e) => e.type); - // resp1 (firstKeptEntryId), msg2, resp2, compaction, next - expect(types).toEqual(["message", "message", "message", "compaction", "message"]); - - // buildSessionContext should include the unsummarized tail - const ctx = smAfter.buildSessionContext(); - expect(ctx.messages.length).toBeGreaterThan(2); - }); - - it("preserves unsummarized sibling branches during truncation", async () => { - const dir = await createTmpDir(); - const sm = SessionManager.create(dir, dir); - - // Build main conversation - sm.appendMessage({ role: "user", content: "hello", timestamp: 1 }); - sm.appendMessage(makeAssistant("hi there", 2)); - - // Save a branch point - const branchPoint = sm.getBranch(); - const branchFromId = branchPoint[branchPoint.length - 1].id; - - // Continue main branch - sm.appendMessage({ role: "user", content: "do task A", timestamp: 3 }); - sm.appendMessage(makeAssistant("done A", 4)); - - // Create a sibling branch from the earlier point - sm.branch(branchFromId); - sm.appendMessage({ role: "user", content: "do task B instead", timestamp: 5 }); - const siblingMsg = sm.appendMessage(makeAssistant("done B", 6)); - - // Go back to main branch tip and add compaction there - sm.branch(branchFromId); - sm.appendMessage({ role: "user", content: "do task A", timestamp: 3 }); - sm.appendMessage(makeAssistant("done A take 2", 7)); - const mainBranch = sm.getBranch(); - const firstKeptId = mainBranch[mainBranch.length - 1].id; - sm.appendCompaction("Summary of main branch.", firstKeptId, 5000); - sm.appendMessage({ role: "user", content: "next", timestamp: 8 }); - - const sessionFile = sm.getSessionFile()!; - - const entriesBefore = sm.getEntries(); - - const result = await truncateSessionAfterCompaction({ sessionFile }); - - expect(result.truncated).toBe(true); - - // Verify sibling branch is preserved in the full entry list - const smAfter = SessionManager.open(sessionFile); - const allAfter = smAfter.getEntries(); - - // The sibling branch message should still exist - const siblingAfter = allAfter.find((e) => e.id === siblingMsg); - expect(siblingAfter).toBeDefined(); - - // The tree should have entries from both branches - const tree = smAfter.getTree(); - expect(tree.length).toBeGreaterThan(0); - - // Total entries should be less (main branch messages removed) but not zero - expect(allAfter.length).toBeGreaterThan(0); - expect(allAfter.length).toBeLessThan(entriesBefore.length); - }); -}); diff --git a/src/agents/pi-embedded-runner/session-truncation.ts b/src/agents/pi-embedded-runner/session-truncation.ts deleted file mode 100644 index 00886156094..00000000000 --- a/src/agents/pi-embedded-runner/session-truncation.ts +++ /dev/null @@ -1,252 +0,0 @@ -import fs from "node:fs/promises"; -import path from "node:path"; -import type { CompactionEntry, SessionEntry } from "@mariozechner/pi-coding-agent"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { - isHeartbeatOkResponse, - isHeartbeatUserMessage, -} from "../../auto-reply/heartbeat-filter.js"; -import { formatErrorMessage } from "../../infra/errors.js"; -import { log } from "./logger.js"; - -/** - * Truncate a session JSONL file after compaction by removing only the - * message entries that the compaction actually summarized. - * - * After compaction, the session file still contains all historical entries - * even though `buildSessionContext()` logically skips entries before - * `firstKeptEntryId`. Over many compaction cycles this causes unbounded - * file growth (issue #39953). - * - * This function rewrites the file keeping: - * 1. The session header - * 2. All non-message session state (custom, model_change, thinking_level_change, - * session_info, custom_message, compaction entries) - * Note: label and branch_summary entries referencing removed messages are - * also dropped to avoid dangling metadata. - * 3. All entries from sibling branches not covered by the compaction - * 4. The unsummarized tail: entries from `firstKeptEntryId` through (and - * including) the compaction entry, plus all entries after it - * - * Only `message` entries in the current branch that precede the compaction's - * `firstKeptEntryId` are removed — they are the entries the compaction - * actually summarized. Entries from `firstKeptEntryId` onward are preserved - * because `buildSessionContext()` expects them when reconstructing the - * session. Entries whose parent was removed are re-parented to the nearest - * kept ancestor (or become roots). - */ -export async function truncateSessionAfterCompaction(params: { - sessionFile: string; - /** Optional path to archive the pre-truncation file. */ - archivePath?: string; - ackMaxChars?: number; - heartbeatPrompt?: string; -}): Promise { - const { sessionFile } = params; - - let sm: SessionManager; - try { - sm = SessionManager.open(sessionFile); - } catch (err) { - const reason = formatErrorMessage(err); - log.warn(`[session-truncation] Failed to open session file: ${reason}`); - return { truncated: false, entriesRemoved: 0, reason }; - } - - const header = sm.getHeader(); - if (!header) { - return { truncated: false, entriesRemoved: 0, reason: "missing session header" }; - } - - const branch = sm.getBranch(); - if (branch.length === 0) { - return { truncated: false, entriesRemoved: 0, reason: "empty session" }; - } - - // Find the latest compaction entry in the current branch - let latestCompactionIdx = -1; - for (let i = branch.length - 1; i >= 0; i--) { - if (branch[i].type === "compaction") { - latestCompactionIdx = i; - break; - } - } - - if (latestCompactionIdx < 0) { - return { truncated: false, entriesRemoved: 0, reason: "no compaction entry found" }; - } - - // Nothing to truncate if compaction is already at root - if (latestCompactionIdx === 0) { - return { truncated: false, entriesRemoved: 0, reason: "compaction already at root" }; - } - - // The compaction's firstKeptEntryId marks the start of the "unsummarized - // tail" — entries from firstKeptEntryId through the compaction that - // buildSessionContext() expects to find when reconstructing the session. - // Only entries *before* firstKeptEntryId were actually summarized. - const compactionEntry = branch[latestCompactionIdx] as CompactionEntry; - const { firstKeptEntryId } = compactionEntry; - - // Collect IDs of entries in the current branch that were actually summarized - // (everything before firstKeptEntryId). Entries from firstKeptEntryId through - // the compaction are the unsummarized tail and must be preserved. - const summarizedBranchIds = new Set(); - for (let i = 0; i < latestCompactionIdx; i++) { - if (firstKeptEntryId && branch[i].id === firstKeptEntryId) { - break; // Everything from here to the compaction is the unsummarized tail - } - summarizedBranchIds.add(branch[i].id); - } - - // Operate on the full transcript so sibling branches and tree metadata - // are not silently dropped. - const allEntries = sm.getEntries(); - - // Only remove message-type entries that the compaction actually summarized. - // Non-message session state (custom, model_change, thinking_level_change, - // session_info, custom_message) is preserved even if it sits in the - // summarized portion of the branch. - // - // label and branch_summary entries that reference removed message IDs are - // also dropped to avoid dangling metadata (consistent with the approach in - // tool-result-truncation.ts). - const removedIds = new Set(); - for (const entry of allEntries) { - if (summarizedBranchIds.has(entry.id) && entry.type === "message") { - removedIds.add(entry.id); - } - } - - for (let i = 0; i < branch.length - 1; i++) { - const userEntry = branch[i]; - const assistantEntry = branch[i + 1]; - if ( - userEntry.type === "message" && - assistantEntry.type === "message" && - summarizedBranchIds.has(userEntry.id) && - summarizedBranchIds.has(assistantEntry.id) && - !removedIds.has(userEntry.id) && - !removedIds.has(assistantEntry.id) && - isHeartbeatUserMessage(userEntry.message, params.heartbeatPrompt) && - isHeartbeatOkResponse(assistantEntry.message, params.ackMaxChars) - ) { - removedIds.add(userEntry.id); - removedIds.add(assistantEntry.id); - i++; - } - } - - // Labels bookmark targetId while parentId just records the leaf when the - // label was changed, so targetId determines whether the label is still valid. - // Branch summaries still hang off the summarized branch via parentId. - for (const entry of allEntries) { - if (entry.type === "label" && removedIds.has(entry.targetId)) { - removedIds.add(entry.id); - continue; - } - if ( - entry.type === "branch_summary" && - entry.parentId !== null && - removedIds.has(entry.parentId) - ) { - removedIds.add(entry.id); - } - } - - if (removedIds.size === 0) { - return { truncated: false, entriesRemoved: 0, reason: "no entries to remove" }; - } - - // Build an id→entry map for walking parent chains during re-parenting. - const entryById = new Map(); - for (const entry of allEntries) { - entryById.set(entry.id, entry); - } - - // Keep every entry that was not removed, re-parenting where necessary so - // the tree stays connected. - const keptEntries: SessionEntry[] = []; - for (const entry of allEntries) { - if (removedIds.has(entry.id)) { - continue; - } - - // Walk up the parent chain to find the nearest kept ancestor. - let newParentId = entry.parentId; - while (newParentId !== null && removedIds.has(newParentId)) { - const parent = entryById.get(newParentId); - newParentId = parent?.parentId ?? null; - } - - if (newParentId !== entry.parentId) { - keptEntries.push({ ...entry, parentId: newParentId }); - } else { - keptEntries.push(entry); - } - } - - const entriesRemoved = removedIds.size; - const totalEntriesBefore = allEntries.length; - - // Get file size before truncation - let bytesBefore = 0; - try { - const stat = await fs.stat(sessionFile); - bytesBefore = stat.size; - } catch { - // If stat fails, continue anyway - } - - // Archive original file if requested - if (params.archivePath) { - try { - const archiveDir = path.dirname(params.archivePath); - await fs.mkdir(archiveDir, { recursive: true }); - await fs.copyFile(sessionFile, params.archivePath); - log.info(`[session-truncation] Archived pre-truncation file to ${params.archivePath}`); - } catch (err) { - const reason = formatErrorMessage(err); - log.warn(`[session-truncation] Failed to archive: ${reason}`); - } - } - - // Write truncated file atomically (temp + rename) - const lines: string[] = [JSON.stringify(header), ...keptEntries.map((e) => JSON.stringify(e))]; - const content = lines.join("\n") + "\n"; - - const tmpFile = `${sessionFile}.truncate-tmp`; - try { - await fs.writeFile(tmpFile, content, "utf-8"); - await fs.rename(tmpFile, sessionFile); - } catch (err) { - // Clean up temp file on failure - try { - await fs.unlink(tmpFile); - } catch { - // Ignore cleanup errors - } - const reason = formatErrorMessage(err); - log.warn(`[session-truncation] Failed to write truncated file: ${reason}`); - return { truncated: false, entriesRemoved: 0, reason }; - } - - const bytesAfter = Buffer.byteLength(content, "utf-8"); - - log.info( - `[session-truncation] Truncated session file: ` + - `entriesBefore=${totalEntriesBefore} entriesAfter=${keptEntries.length} ` + - `removed=${entriesRemoved} bytesBefore=${bytesBefore} bytesAfter=${bytesAfter} ` + - `reduction=${bytesBefore > 0 ? ((1 - bytesAfter / bytesBefore) * 100).toFixed(1) : "?"}%`, - ); - - return { truncated: true, entriesRemoved, bytesBefore, bytesAfter }; -} - -export type TruncationResult = { - truncated: boolean; - entriesRemoved: number; - bytesBefore?: number; - bytesAfter?: number; - reason?: string; -}; diff --git a/src/agents/pi-embedded-runner/types.ts b/src/agents/pi-embedded-runner/types.ts index 1b38517285b..b9dd3e73ab3 100644 --- a/src/agents/pi-embedded-runner/types.ts +++ b/src/agents/pi-embedded-runner/types.ts @@ -4,6 +4,7 @@ import type { MessagingToolSend } from "../pi-embedded-messaging.types.js"; export type EmbeddedPiAgentMeta = { sessionId: string; + sessionFile?: string; provider: string; model: string; contextTokens?: number; @@ -174,6 +175,8 @@ export type EmbeddedPiCompactResult = { tokensBefore: number; tokensAfter?: number; details?: unknown; + sessionId?: string; + sessionFile?: string; }; }; diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index f7c3fab3ecb..366e6febff7 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -506,6 +506,8 @@ export async function runPreflightCompactionIfNeeded(params: { sessionKey: params.sessionKey, storePath: params.storePath, tokensAfter: result.result?.tokensAfter, + newSessionId: result.result?.sessionId, + newSessionFile: result.result?.sessionFile, }); await appendPostCompactionRefreshPrompt({ cfg: params.cfg, @@ -749,6 +751,7 @@ export async function runMemoryFlushIfNeeded(params: { .filter(Boolean) .join("\n\n"); let postCompactionSessionId: string | undefined; + let postCompactionSessionFile: string | undefined; try { await memoryDeps.runWithModelFallback({ ...resolveModelFallbackOptions(params.followupRun.run), @@ -791,6 +794,9 @@ export async function runMemoryFlushIfNeeded(params: { if (result.meta?.agentMeta?.sessionId) { postCompactionSessionId = result.meta.agentMeta.sessionId; } + if (result.meta?.agentMeta?.sessionFile) { + postCompactionSessionFile = result.meta.agentMeta.sessionFile; + } bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, ); @@ -810,6 +816,7 @@ export async function runMemoryFlushIfNeeded(params: { sessionKey: params.sessionKey, storePath: params.storePath, newSessionId: postCompactionSessionId, + newSessionFile: postCompactionSessionFile, }); const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined; if (updatedEntry) { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index b0ecc8b5d3b..4a44124cfdd 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1537,6 +1537,7 @@ export async function runReplyAgent(params: { lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, newSessionId: runResult.meta?.agentMeta?.sessionId, + newSessionFile: runResult.meta?.agentMeta?.sessionFile, }); const refreshedSessionEntry = sessionKey && activeSessionStore ? activeSessionStore[sessionKey] : undefined; diff --git a/src/auto-reply/reply/commands-compact.ts b/src/auto-reply/reply/commands-compact.ts index faf0e0ebd6a..1a9024164a8 100644 --- a/src/auto-reply/reply/commands-compact.ts +++ b/src/auto-reply/reply/commands-compact.ts @@ -176,6 +176,8 @@ export const handleCompactCommand: CommandHandler = async (params) => { storePath: params.storePath, // Update token counts after compaction tokensAfter: result.result?.tokensAfter, + newSessionId: result.result?.sessionId, + newSessionFile: result.result?.sessionFile, }); } // Use the post-compaction token count for context summary if available diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 4f0e79a06b8..d27fab7984d 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -455,6 +455,7 @@ export function createFollowupRunner(params: { lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, newSessionId: runResult.meta?.agentMeta?.sessionId, + newSessionFile: runResult.meta?.agentMeta?.sessionFile, }); const refreshedSessionEntry = sessionKey && sessionStore ? sessionStore[sessionKey] : undefined; diff --git a/src/auto-reply/reply/session-run-accounting.ts b/src/auto-reply/reply/session-run-accounting.ts index 78566dd4a36..75b817655ce 100644 --- a/src/auto-reply/reply/session-run-accounting.ts +++ b/src/auto-reply/reply/session-run-accounting.ts @@ -14,6 +14,7 @@ type IncrementRunCompactionCountParams = Omit< lastCallUsage?: NormalizedUsage; contextTokensUsed?: number; newSessionId?: string; + newSessionFile?: string; }; export async function persistRunSessionUsage(params: PersistRunSessionUsageParams): Promise { @@ -38,5 +39,6 @@ export async function incrementRunCompactionCount( amount: params.amount, tokensAfter: tokensAfterCompaction, newSessionId: params.newSessionId, + newSessionFile: params.newSessionFile, }); } diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 80361a800f5..47121243b79 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -219,6 +219,8 @@ export async function incrementCompactionCount(params: { tokensAfter?: number; /** Session id after compaction, when the runtime rotated transcripts. */ newSessionId?: string; + /** Session file after compaction, when the runtime rotated transcripts. */ + newSessionFile?: string; }): Promise { const { sessionEntry, @@ -230,6 +232,7 @@ export async function incrementCompactionCount(params: { amount = 1, tokensAfter, newSessionId, + newSessionFile, } = params; if (!sessionStore || !sessionKey) { return undefined; @@ -247,12 +250,14 @@ export async function incrementCompactionCount(params: { }; if (newSessionId && newSessionId !== entry.sessionId) { updates.sessionId = newSessionId; - updates.sessionFile = resolveCompactionSessionFile({ - entry, - sessionKey, - storePath, - newSessionId, - }); + updates.sessionFile = + newSessionFile ?? + resolveCompactionSessionFile({ + entry, + sessionKey, + storePath, + newSessionId, + }); } // If tokensAfter is provided, update the cached token counts to reflect post-compaction state if (tokensAfter != null && tokensAfter > 0) { diff --git a/src/cli/update-cli.test.ts b/src/cli/update-cli.test.ts index 700c26e2fea..849b286c4b1 100644 --- a/src/cli/update-cli.test.ts +++ b/src/cli/update-cli.test.ts @@ -381,14 +381,14 @@ describe("update-cli", () => { }; const setupUpdatedRootRefresh = (params?: { - gatewayUpdateImpl?: () => Promise; + gatewayUpdateImpl?: (root: string) => Promise; entrypoints?: string[]; }) => { const root = createCaseDir("openclaw-updated-root"); const entrypoints = params?.entrypoints ?? [path.join(root, "dist", "entry.js")]; pathExists.mockImplementation(async (candidate: string) => entrypoints.includes(candidate)); if (params?.gatewayUpdateImpl) { - vi.mocked(runGatewayUpdate).mockImplementation(params.gatewayUpdateImpl); + vi.mocked(runGatewayUpdate).mockImplementation(() => params.gatewayUpdateImpl!(root)); } else { vi.mocked(runGatewayUpdate).mockResolvedValue({ status: "ok", diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 47149742c34..f380f1f6921 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -4990,9 +4990,9 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { }, truncateAfterCompaction: { type: "boolean", - title: "Truncate After Compaction", + title: "Rotate Transcript After Compaction", description: - "When enabled, rewrites the session JSONL file after compaction to remove entries that were summarized. Prevents unbounded file growth in long-running sessions with many compaction cycles. Default: false.", + "When enabled, rotates the active session JSONL file after compaction so future turns load only the summary and unsummarized tail while the previous full transcript remains archived. Prevents unbounded active transcript growth in long-running sessions. Default: false.", }, notifyUser: { type: "boolean", @@ -26856,8 +26856,8 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { tags: ["models"], }, "agents.defaults.compaction.truncateAfterCompaction": { - label: "Truncate After Compaction", - help: "When enabled, rewrites the session JSONL file after compaction to remove entries that were summarized. Prevents unbounded file growth in long-running sessions with many compaction cycles. Default: false.", + label: "Rotate Transcript After Compaction", + help: "When enabled, rotates the active session JSONL file after compaction so future turns load only the summary and unsummarized tail while the previous full transcript remains archived. Prevents unbounded active transcript growth in long-running sessions. Default: false.", tags: ["advanced"], }, "agents.defaults.compaction.notifyUser": { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index e56682f1eb4..557b638c5cb 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -1266,7 +1266,7 @@ export const FIELD_HELP: Record = { "agents.defaults.compaction.model": "Optional provider/model override used only for compaction summarization. Set this when you want compaction to run on a different model than the session default, and leave it unset to keep using the primary agent model.", "agents.defaults.compaction.truncateAfterCompaction": - "When enabled, rewrites the session JSONL file after compaction to remove entries that were summarized. Prevents unbounded file growth in long-running sessions with many compaction cycles. Default: false.", + "When enabled, rotates the active session JSONL file after compaction so future turns load only the summary and unsummarized tail while the previous full transcript remains archived. Prevents unbounded active transcript growth in long-running sessions. Default: false.", "agents.defaults.compaction.notifyUser": "When enabled, sends brief compaction notices to the user when compaction starts and when it completes (for example, '🧹 Compacting context...' and '🧹 Compaction complete'). Disabled by default to keep compaction silent and non-intrusive.", "agents.defaults.compaction.memoryFlush": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 8374c1e185f..8517201d683 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -594,7 +594,7 @@ export const FIELD_LABELS: Record = { "agents.defaults.compaction.postCompactionSections": "Post-Compaction Context Sections", "agents.defaults.compaction.timeoutSeconds": "Compaction Timeout (Seconds)", "agents.defaults.compaction.model": "Compaction Model Override", - "agents.defaults.compaction.truncateAfterCompaction": "Truncate After Compaction", + "agents.defaults.compaction.truncateAfterCompaction": "Rotate Transcript After Compaction", "agents.defaults.compaction.notifyUser": "Compaction Notify User", "agents.defaults.compaction.memoryFlush": "Compaction Memory Flush", "agents.defaults.compaction.memoryFlush.enabled": "Compaction Memory Flush Enabled", diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index 5ec5e9606db..7c86706b9e9 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -471,8 +471,9 @@ export type AgentCompactionConfig = { */ provider?: string; /** - * Truncate the session JSONL file after compaction to remove entries that - * were summarized. Prevents unbounded file growth in long-running sessions. + * Rotate the active session JSONL file after compaction so the next turn + * starts from the compaction summary and unsummarized tail while the old + * transcript stays archived. * Default: false (existing behavior preserved). */ truncateAfterCompaction?: boolean; diff --git a/src/context-engine/delegate.ts b/src/context-engine/delegate.ts index 379ec7d3d21..c02a65effc6 100644 --- a/src/context-engine/delegate.ts +++ b/src/context-engine/delegate.ts @@ -74,6 +74,8 @@ export async function delegateCompactionToRuntime( tokensBefore: result.result.tokensBefore, tokensAfter: result.result.tokensAfter, details: result.result.details, + sessionId: result.result.sessionId, + sessionFile: result.result.sessionFile, } : undefined, }; diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts index 21cea5aee37..8a7d6e8b6f0 100644 --- a/src/context-engine/types.ts +++ b/src/context-engine/types.ts @@ -22,6 +22,10 @@ export type CompactResult = { tokensBefore: number; tokensAfter?: number; details?: unknown; + /** Session id after compaction, when the runtime rotated transcripts. */ + sessionId?: string; + /** Session file after compaction, when the runtime rotated transcripts. */ + sessionFile?: string; }; }; diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index ff5c60c6c7f..6f3ee233696 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -1597,6 +1597,12 @@ export const sessionsHandlers: GatewayRequestHandlers = { } entryToUpdate.updatedAt = Date.now(); entryToUpdate.compactionCount = Math.max(0, entryToUpdate.compactionCount ?? 0) + 1; + if (result.result?.sessionId && result.result.sessionId !== entryToUpdate.sessionId) { + entryToUpdate.sessionId = result.result.sessionId; + } + if (result.result?.sessionFile) { + entryToUpdate.sessionFile = result.result.sessionFile; + } delete entryToUpdate.inputTokens; delete entryToUpdate.outputTokens; if ( diff --git a/src/scripts/test-projects.test.ts b/src/scripts/test-projects.test.ts index 324110488de..08390f26be7 100644 --- a/src/scripts/test-projects.test.ts +++ b/src/scripts/test-projects.test.ts @@ -74,6 +74,11 @@ const { args: string[], cwd?: string, listChangedPaths?: (baseRef: string, cwd: string) => string[], + options?: { + cwd?: string; + env?: NodeJS.ProcessEnv; + broad?: boolean; + }, ) => string[] | null; resolveChangedTestTargetPlan: ( changedPaths: string[], @@ -904,13 +909,21 @@ describe("test-projects args", () => { ]); }); - it("keeps extension-facing core contract changes focused by default", () => { + it("routes extension-facing core contract changes and supports broad extension opt-in", () => { const changedPaths = ["src/plugin-sdk/core.ts"]; const plans = buildVitestRunPlans(["--changed=origin/main"], process.cwd(), () => changedPaths); + const targetArgs = resolveChangedTargetArgs( + ["--changed=origin/main"], + process.cwd(), + () => changedPaths, + ); + expect(targetArgs).toEqual(["src/plugin-sdk/core.test.ts"]); expect( - resolveChangedTargetArgs(["--changed=origin/main"], process.cwd(), () => changedPaths), - ).toEqual(["src/plugin-sdk/core.test.ts"]); + resolveChangedTargetArgs(["--changed=origin/main"], process.cwd(), () => changedPaths, { + env: { OPENCLAW_TEST_CHANGED_BROAD: "1" }, + }), + ).toEqual(["src/plugin-sdk/core.test.ts", "extensions"]); expect(plans[0]).toEqual({ config: "test/vitest/vitest.plugin-sdk.config.ts", forwardedArgs: [],