diff --git a/src/agents/pi-embedded-runner/compact.hooks.harness.ts b/src/agents/pi-embedded-runner/compact.hooks.harness.ts index 36057b7ddf6..603b16ad94d 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.harness.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.harness.ts @@ -1,5 +1,6 @@ import { vi, type Mock } from "vitest"; import { clearAgentHarnesses } from "../harness/registry.js"; +import type { CompactionTranscriptRotation } from "./compaction-successor-transcript.js"; type MockResolvedModel = { model: { provider: string; api: string; id: string; input: unknown[] }; @@ -98,6 +99,11 @@ export const resolveAgentTransportOverrideMock: Mock<(params?: unknown) => strin export const resolveSandboxContextMock = vi.fn(async () => null); export const maybeCompactAgentHarnessSessionMock: Mock<(params?: unknown) => Promise> = vi.fn(async () => undefined); +export const rotateTranscriptAfterCompactionMock: Mock< + (_params?: unknown) => Promise +> = vi.fn(async () => ({ + rotated: false, +})); export function resetCompactSessionStateMocks(): void { sanitizeSessionHistoryMock.mockReset(); @@ -138,6 +144,8 @@ export function resetCompactSessionStateMocks(): void { resolveSandboxContextMock.mockResolvedValue(null); maybeCompactAgentHarnessSessionMock.mockReset(); maybeCompactAgentHarnessSessionMock.mockResolvedValue(undefined); + rotateTranscriptAfterCompactionMock.mockReset(); + rotateTranscriptAfterCompactionMock.mockResolvedValue({ rotated: false }); } export function resetCompactHooksHarnessMocks(): void { @@ -209,6 +217,7 @@ export async function loadCompactHooksHarness(): Promise<{ vi.doMock("../../plugins/provider-runtime.js", () => ({ prepareProviderRuntimeAuth: vi.fn(async () => ({ resolvedApiKey: undefined })), + resolveProviderReasoningOutputModeWithPlugin: vi.fn(() => undefined), resolveProviderSystemPromptContribution: vi.fn(() => undefined), resolveProviderTextTransforms: vi.fn(() => undefined), transformProviderSystemPrompt: vi.fn( @@ -264,12 +273,17 @@ export async function loadCompactHooksHarness(): Promise<{ session.messages.splice(1); return await sessionCompactImpl(); }), + setActiveToolsByName: vi.fn(), abortCompaction: sessionAbortCompactionMock, dispose: vi.fn(), }; return { session }; }), - DefaultResourceLoader: function DefaultResourceLoader() {}, + DefaultResourceLoader: function DefaultResourceLoader() { + return { + reload: vi.fn(async () => undefined), + }; + }, SessionManager: { open: vi.fn(() => ({})), }, @@ -287,6 +301,7 @@ export async function loadCompactHooksHarness(): Promise<{ })); vi.doMock("../pi-settings.js", () => ({ + applyPiCompactionSettingsFromConfig: vi.fn(), ensurePiCompactionReserveTokens: vi.fn(), resolveCompactionReserveTokensFloor: vi.fn(() => 0), })); @@ -442,6 +457,16 @@ export async function loadCompactHooksHarness(): Promise<{ resolveCompactionTimeoutMs: vi.fn(() => 30_000), })); + vi.doMock("./compaction-successor-transcript.js", async () => { + const actual = await vi.importActual( + "./compaction-successor-transcript.js", + ); + return { + ...actual, + rotateTranscriptAfterCompaction: rotateTranscriptAfterCompactionMock, + }; + }); + vi.doMock("./wait-for-idle-before-flush.js", () => ({ flushPendingToolResultsAfterIdle: vi.fn(async () => {}), })); @@ -476,6 +501,8 @@ export async function loadCompactHooksHarness(): Promise<{ vi.doMock("../agent-scope.js", () => ({ listAgentEntries: vi.fn(() => []), + resolveAgentConfig: vi.fn(() => undefined), + resolveDefaultAgentId: vi.fn(() => "main"), resolveSessionAgentId: resolveSessionAgentIdMock, resolveSessionAgentIds: vi.fn(() => ({ defaultAgentId: "main", sessionAgentId: "main" })), })); diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 4795b8c5ab8..5be6bc0d9a3 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -17,6 +17,7 @@ import { resolveModelMock, resolveSandboxContextMock, resolveSessionAgentIdMock, + rotateTranscriptAfterCompactionMock, resetCompactHooksHarnessMocks, resetCompactSessionStateMocks, sessionAbortCompactionMock, @@ -411,6 +412,49 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { } }); + it("emits post-compaction side effects once for a rotated successor transcript", async () => { + const listener = vi.fn(); + const cleanup = onSessionTranscriptUpdate(listener); + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + rotateTranscriptAfterCompactionMock.mockResolvedValueOnce({ + rotated: true, + sessionId: "rotated-session", + sessionFile: "/tmp/rotated-session.jsonl", + leafId: "rotated-leaf", + }); + + try { + const result = await compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: TEST_SESSION_KEY, + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp/workspace", + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + expect(result.ok).toBe(true); + expect(listener).toHaveBeenCalledTimes(1); + expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/rotated-session.jsonl" }); + expect(sync).toHaveBeenCalledTimes(1); + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + sessionFiles: ["/tmp/rotated-session.jsonl"], + }); + } finally { + cleanup(); + } + }); + it("preserves tokensAfter when full-session context exceeds result.tokensBefore", async () => { estimateTokensMock.mockImplementation((message: unknown) => { const role = (message as { role?: string }).role; @@ -1008,6 +1052,63 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { ); }); + it("rotates in the wrapper when a delegated result echoes the current transcript", async () => { + const maintain = vi.fn(async (_params?: unknown) => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + resolveContextEngineMock.mockResolvedValue({ + info: { ownsCompaction: false }, + compact: contextEngineCompactMock, + maintain, + } as never); + contextEngineCompactMock.mockResolvedValue({ + ok: true, + compacted: true, + reason: undefined, + result: { + summary: "engine-summary", + firstKeptEntryId: "entry-1", + tokensBefore: 120, + tokensAfter: 50, + sessionId: TEST_SESSION_ID, + sessionFile: TEST_SESSION_FILE, + }, + } as never); + rotateTranscriptAfterCompactionMock.mockResolvedValueOnce({ + rotated: true, + sessionId: "wrapper-rotated-session", + sessionFile: "/tmp/wrapper-rotated-session.jsonl", + leafId: "wrapper-rotated-leaf", + }); + + const result = await compactEmbeddedPiSession( + wrappedCompactionArgs({ + config: { + agents: { + defaults: { + compaction: { + truncateAfterCompaction: true, + }, + }, + }, + }, + }), + ); + + expect(result.ok).toBe(true); + expect(rotateTranscriptAfterCompactionMock).toHaveBeenCalledTimes(1); + expect(result.result?.sessionId).toBe("wrapper-rotated-session"); + expect(result.result?.sessionFile).toBe("/tmp/wrapper-rotated-session.jsonl"); + expect(maintain).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "wrapper-rotated-session", + sessionFile: "/tmp/wrapper-rotated-session.jsonl", + }), + ); + }); + it("catches and logs hook exceptions without aborting compaction", async () => { hookRunner.hasHooks.mockReturnValue(true); hookRunner.runBeforeCompaction.mockRejectedValue(new Error("hook boom")); diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index 1efc1984d7c..422d619222e 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -164,7 +164,9 @@ export async function compactEmbeddedPiSession( }); const delegatedSessionId = result.result?.sessionId; const delegatedSessionFile = result.result?.sessionFile; - const delegatedRotatedTranscript = Boolean(delegatedSessionId || delegatedSessionFile); + const delegatedRotatedTranscript = + (typeof delegatedSessionId === "string" && delegatedSessionId !== params.sessionId) || + (typeof delegatedSessionFile === "string" && delegatedSessionFile !== params.sessionFile); let postCompactionSessionId = delegatedSessionId ?? params.sessionId; let postCompactionSessionFile = delegatedSessionFile ?? params.sessionFile; let postCompactionLeafId: string | undefined; diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 085b3fe9b81..8306f37b3c9 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -1073,11 +1073,6 @@ export async function compactEmbeddedPiSessionDirect( }, }, ); - await runPostCompactionSideEffects({ - config: params.config, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - }); let effectiveFirstKeptEntryId = result.firstKeptEntryId; let postCompactionLeafId = typeof sessionManager.getLeafId === "function" @@ -1135,12 +1130,12 @@ export async function compactEmbeddedPiSessionDirect( `[compaction] rotated active transcript after compaction ` + `(sessionKey=${params.sessionKey ?? params.sessionId})`, ); - await runPostCompactionSideEffects({ - config: params.config, - sessionKey: params.sessionKey, - sessionFile: activeSessionFile, - }); } + await runPostCompactionSideEffects({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: activeSessionFile, + }); if (params.config && params.sessionKey && checkpointSnapshot) { try { const storedCheckpoint = await persistSessionCompactionCheckpoint({ diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts index c0f5f7e1f9a..d7fb656a642 100644 --- a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts @@ -163,6 +163,55 @@ describe("rotateTranscriptAfterCompaction", () => { firstKeptEntryId: compactionId, }); }); + + it("preserves unsummarized sibling branches and branch summaries", async () => { + const dir = await createTmpDir(); + const manager = SessionManager.create(dir, dir); + + manager.appendMessage({ role: "user", content: "hello", timestamp: 1 }); + const branchFromId = manager.appendMessage(makeAssistant("hi there", 2)); + + const branchSummaryId = manager.branchWithSummary( + branchFromId, + "Summary of the abandoned branch.", + ); + const siblingMsgId = manager.appendMessage({ + role: "user", + content: "do task B instead", + timestamp: 3, + }); + manager.appendMessage(makeAssistant("done B", 4)); + + manager.branch(branchFromId); + manager.appendMessage({ role: "user", content: "do task A", timestamp: 5 }); + const firstKeptId = manager.appendMessage(makeAssistant("done A", 6)); + manager.appendCompaction("Summary of main branch.", firstKeptId, 5000); + manager.appendMessage({ role: "user", content: "next", timestamp: 7 }); + + const sessionFile = manager.getSessionFile()!; + const result = await rotateTranscriptAfterCompaction({ + sessionManager: manager, + sessionFile, + now: () => new Date("2026-04-27T12:45:00.000Z"), + }); + + expect(result.rotated).toBe(true); + const successor = SessionManager.open(result.sessionFile!); + const allEntries = successor.getEntries(); + expect(allEntries.find((entry) => entry.id === branchSummaryId)).toMatchObject({ + type: "branch_summary", + summary: "Summary of the abandoned branch.", + }); + expect(allEntries.find((entry) => entry.id === siblingMsgId)).toMatchObject({ + type: "message", + message: expect.objectContaining({ content: "do task B instead" }), + }); + + const activeContextText = JSON.stringify(successor.buildSessionContext().messages); + expect(activeContextText).toContain("Summary of main branch."); + expect(activeContextText).toContain("next"); + expect(activeContextText).not.toContain("do task B instead"); + }); }); describe("shouldRotateCompactionTranscript", () => { diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts index ce701c0b16a..9deee8f14cc 100644 --- a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts @@ -12,7 +12,7 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js"; type ReadonlySessionManagerForRotation = Pick< SessionManager, - "buildSessionContext" | "getBranch" | "getCwd" | "getHeader" + "buildSessionContext" | "getBranch" | "getCwd" | "getEntries" | "getHeader" >; export type CompactionTranscriptRotation = { @@ -54,6 +54,7 @@ export async function rotateTranscriptAfterCompaction(params: { timestamp, }); const successorEntries = buildSuccessorEntries({ + allEntries: params.sessionManager.getEntries(), branch, latestCompactionIndex, }); @@ -97,69 +98,66 @@ function findLatestCompactionIndex(entries: SessionEntry[]): number { } function buildSuccessorEntries(params: { + allEntries: SessionEntry[]; branch: SessionEntry[]; latestCompactionIndex: number; }): SessionEntry[] { - const { branch, latestCompactionIndex } = params; + const { allEntries, branch, latestCompactionIndex } = params; const compaction = branch[latestCompactionIndex] as CompactionEntry; - const firstKeptIndex = branch.findIndex((entry) => entry.id === compaction.firstKeptEntryId); - const keptBeforeCompaction = - firstKeptIndex >= 0 && firstKeptIndex < latestCompactionIndex - ? branch.slice(firstKeptIndex, latestCompactionIndex) - : []; - const afterCompaction = branch.slice(latestCompactionIndex + 1); - const statePrefix = collectLatestStatePrefix(branch.slice(0, latestCompactionIndex)); - const successorEntries: SessionEntry[] = []; - const seenIds = new Set(); - let parentId: string | null = null; - const append = (entry: SessionEntry) => { - if (seenIds.has(entry.id)) { - return; - } - const nextEntry = { ...entry, parentId } as SessionEntry; - successorEntries.push(nextEntry); - seenIds.add(nextEntry.id); - parentId = nextEntry.id; - }; - - for (const entry of statePrefix) { - append(entry); - } - append(compaction); - for (const entry of [...keptBeforeCompaction, ...afterCompaction]) { - if (entry.type === "compaction" || entry.type === "label") { + const summarizedBranchIds = new Set(); + for (let index = 0; index < latestCompactionIndex; index += 1) { + const entry = branch[index]; + if (!entry) { continue; } - append(entry); + if (compaction.firstKeptEntryId && entry.id === compaction.firstKeptEntryId) { + break; + } + summarizedBranchIds.add(entry.id); } - const retainedIds = new Set(successorEntries.map((entry) => entry.id)); - for (const entry of branch) { - if (entry.type !== "label" || !retainedIds.has(entry.targetId)) { + + const removedIds = new Set(); + for (const entry of allEntries) { + if (summarizedBranchIds.has(entry.id) && entry.type === "message") { + removedIds.add(entry.id); + } + } + for (const entry of allEntries) { + if (entry.type === "label" && removedIds.has(entry.targetId)) { + removedIds.add(entry.id); + } + } + + const entryById = new Map(allEntries.map((entry) => [entry.id, entry])); + const activeBranchIds = new Set(branch.map((entry) => entry.id)); + const keptEntries: SessionEntry[] = []; + for (const entry of allEntries) { + if (removedIds.has(entry.id)) { continue; } - append(entry); - } - return successorEntries; -} -function collectLatestStatePrefix(entries: SessionEntry[]): SessionEntry[] { - const customEntries: Array<{ index: number; entry: SessionEntry }> = []; - const latestByType = new Map(); - for (const [index, entry] of entries.entries()) { - if (entry.type === "custom") { - customEntries.push({ index, entry }); - } else if ( - entry.type === "thinking_level_change" || - entry.type === "model_change" || - entry.type === "session_info" - ) { - latestByType.set(entry.type, { index, entry }); + let parentId = entry.parentId; + while (parentId !== null && removedIds.has(parentId)) { + parentId = entryById.get(parentId)?.parentId ?? null; + } + + keptEntries.push( + parentId === entry.parentId ? entry : ({ ...entry, parentId } as SessionEntry), + ); + } + + const inactiveEntries: SessionEntry[] = []; + const activeEntries: SessionEntry[] = []; + for (const entry of keptEntries) { + if (activeBranchIds.has(entry.id)) { + activeEntries.push(entry); + } else { + inactiveEntries.push(entry); } } - return [...customEntries, ...latestByType.values()] - .toSorted((left, right) => left.index - right.index) - .map(({ entry }) => entry); + + return [...inactiveEntries, ...activeEntries]; } function buildSuccessorHeader(params: { diff --git a/src/auto-reply/reply/reply-state.test.ts b/src/auto-reply/reply/reply-state.test.ts index 6485965b630..3101742b053 100644 --- a/src/auto-reply/reply/reply-state.test.ts +++ b/src/auto-reply/reply/reply-state.test.ts @@ -556,6 +556,31 @@ describe("incrementCompactionCount", () => { expect(stored[sessionKey].compactionCount).toBe(1); }); + it("updates sessionFile when rotation keeps the same sessionId", async () => { + const entry = { + sessionId: "same-id", + sessionFile: "same-id.jsonl", + updatedAt: Date.now(), + compactionCount: 0, + } as SessionEntry; + const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry); + const rotatedSessionFile = path.join(path.dirname(storePath), "rotated-same-id.jsonl"); + + await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + newSessionId: "same-id", + newSessionFile: rotatedSessionFile, + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].sessionId).toBe("same-id"); + expect(stored[sessionKey].sessionFile).toBe(rotatedSessionFile); + expect(stored[sessionKey].compactionCount).toBe(1); + }); + it("does not update totalTokens when tokensAfter is not provided", async () => { const entry = { sessionId: "s1", diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 47121243b79..df90d6bb02f 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -248,16 +248,23 @@ export async function incrementCompactionCount(params: { compactionCount: nextCount, updatedAt: now, }; - if (newSessionId && newSessionId !== entry.sessionId) { + const explicitNewSessionFile = normalizeOptionalString(newSessionFile); + const sessionIdChanged = Boolean(newSessionId && newSessionId !== entry.sessionId); + const sessionFileChanged = Boolean( + explicitNewSessionFile && explicitNewSessionFile !== entry.sessionFile, + ); + if (sessionIdChanged && newSessionId) { updates.sessionId = newSessionId; updates.sessionFile = - newSessionFile ?? + explicitNewSessionFile ?? resolveCompactionSessionFile({ entry, sessionKey, storePath, newSessionId, }); + } else if (sessionFileChanged && explicitNewSessionFile) { + updates.sessionFile = explicitNewSessionFile; } // If tokensAfter is provided, update the cached token counts to reflect post-compaction state if (tokensAfter != null && tokensAfter > 0) { @@ -281,7 +288,7 @@ export async function incrementCompactionCount(params: { }; }); } - if (newSessionId && newSessionId !== entry.sessionId && cfg) { + if ((sessionIdChanged || sessionFileChanged) && cfg) { emitCompactionSessionLifecycleHooks({ cfg, sessionKey,