From 4af9bf751ff701a9ca96a8bbf89d225bc31d7de8 Mon Sep 17 00:00:00 2001 From: scoootscooob Date: Mon, 6 Apr 2026 16:39:43 -0700 Subject: [PATCH] Gateway: harden manual compaction checkpoints --- src/agents/pi-embedded-runner/compact.ts | 31 ++++- .../manual-compaction-boundary.test.ts | 109 +++++++++++++++++ .../manual-compaction-boundary.ts | 103 ++++++++++++++++ ui/src/ui/controllers/sessions.test.ts | 95 ++++++++++++++- ui/src/ui/controllers/sessions.ts | 115 ++++++++++++++---- 5 files changed, 422 insertions(+), 31 deletions(-) create mode 100644 src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts create mode 100644 src/agents/pi-embedded-runner/manual-compaction-boundary.ts diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index cba7a9d200b..06306b49874 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -115,6 +115,7 @@ import { applyExtraParamsToAgent } from "./extra-params.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; +import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js"; import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-discovery-input.js"; import { readPiModelContextTokens } from "./model-context-tokens.js"; import { buildModelAliasLines, resolveModelAsync } from "./model.js"; @@ -973,6 +974,28 @@ export async function compactEmbeddedPiSessionDirect( sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); + let effectiveFirstKeptEntryId = result.firstKeptEntryId; + let postCompactionLeafId = + typeof sessionManager.getLeafId === "function" + ? (sessionManager.getLeafId() ?? undefined) + : undefined; + if (params.trigger === "manual") { + try { + const hardenedBoundary = await hardenManualCompactionBoundary({ + sessionFile: params.sessionFile, + }); + if (hardenedBoundary.applied) { + effectiveFirstKeptEntryId = + hardenedBoundary.firstKeptEntryId ?? effectiveFirstKeptEntryId; + postCompactionLeafId = hardenedBoundary.leafId ?? postCompactionLeafId; + session.agent.state.messages = hardenedBoundary.messages; + } + } catch (err) { + log.warn("[compaction] failed to harden manual compaction boundary", { + errorMessage: err instanceof Error ? err.message : String(err), + }); + } + } // Estimate tokens after compaction by summing token estimates for remaining messages const tokensAfter = estimateTokensAfterCompaction({ messagesAfter: session.messages, @@ -984,7 +1007,6 @@ export async function compactEmbeddedPiSessionDirect( const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter); if (params.config && params.sessionKey && checkpointSnapshot) { try { - const postCompactionLeafId = sessionManager.getLeafId() ?? undefined; const storedCheckpoint = await persistSessionCompactionCheckpoint({ cfg: params.config, sessionKey: params.sessionKey, @@ -994,7 +1016,7 @@ export async function compactEmbeddedPiSessionDirect( }), snapshot: checkpointSnapshot, summary: result.summary, - firstKeptEntryId: result.firstKeptEntryId, + firstKeptEntryId: effectiveFirstKeptEntryId, tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, postSessionFile: params.sessionFile, @@ -1040,7 +1062,7 @@ export async function compactEmbeddedPiSessionDirect( sessionFile: params.sessionFile, summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, tokensBefore: result.tokensBefore, - firstKeptEntryId: result.firstKeptEntryId, + firstKeptEntryId: effectiveFirstKeptEntryId, }); // Truncate session file to remove compacted entries (#39953) if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) { @@ -1072,7 +1094,7 @@ export async function compactEmbeddedPiSessionDirect( compacted: true, result: { summary: result.summary, - firstKeptEntryId: result.firstKeptEntryId, + firstKeptEntryId: effectiveFirstKeptEntryId, tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, details: result.details, @@ -1368,6 +1390,7 @@ export const __testing = { containsRealConversationMessages, estimateTokensAfterCompaction, buildBeforeCompactionHookMetrics, + hardenManualCompactionBoundary, runBeforeCompactionHooks, runAfterCompactionHooks, runPostCompactionSideEffects, diff --git a/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts b/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts new file mode 100644 index 00000000000..61e2080b726 --- /dev/null +++ b/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts @@ -0,0 +1,109 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { afterEach, describe, expect, it } from "vitest"; +import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js"; + +let tmpDir = ""; + +async function makeTmpDir(): Promise { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "manual-compaction-boundary-")); + return tmpDir; +} + +afterEach(async () => { + if (tmpDir) { + await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {}); + tmpDir = ""; + } +}); + +function messageText(message: { content?: unknown }): string { + const content = message.content; + if (typeof content === "string") { + return content; + } + if (!Array.isArray(content)) { + return ""; + } + return content + .map((block) => + block && typeof block === "object" && "text" in block && typeof block.text === "string" + ? block.text + : "", + ) + .filter(Boolean) + .join(" "); +} + +describe("hardenManualCompactionBoundary", () => { + it("turns manual compaction into a true checkpoint for rebuilt context", async () => { + const dir = await makeTmpDir(); + const session = SessionManager.create(dir, dir); + + session.appendMessage({ role: "user", content: "old question", timestamp: 1 }); + session.appendMessage({ + role: "assistant", + content: [{ type: "text", text: "very long old answer" }], + timestamp: 2, + }); + const firstKeepId = session.getBranch().at(-1)?.id; + expect(firstKeepId).toBeTruthy(); + session.appendCompaction("old summary", firstKeepId!, 100); + + session.appendMessage({ role: "user", content: "new question", timestamp: 3 }); + session.appendMessage({ + role: "assistant", + content: [{ type: "text", text: "detailed new answer that should be summarized away" }], + timestamp: 4, + }); + const secondKeepId = session.getBranch().at(-1)?.id; + expect(secondKeepId).toBeTruthy(); + const latestCompactionId = session.appendCompaction("fresh summary", secondKeepId!, 200); + const sessionFile = session.getSessionFile(); + expect(sessionFile).toBeTruthy(); + + const before = SessionManager.open(sessionFile!); + const beforeTexts = before + .buildSessionContext() + .messages.map((message) => messageText(message)); + expect(beforeTexts.join("\n")).toContain("detailed new answer"); + + const hardened = await hardenManualCompactionBoundary({ sessionFile: sessionFile! }); + expect(hardened.applied).toBe(true); + expect(hardened.firstKeptEntryId).toBe(latestCompactionId); + expect(hardened.messages.map((message) => message.role)).toEqual(["compactionSummary"]); + + const reopened = SessionManager.open(sessionFile!); + const latest = reopened.getLeafEntry(); + expect(latest?.type).toBe("compaction"); + expect(latest?.firstKeptEntryId).toBe(latestCompactionId); + + reopened.appendMessage({ role: "user", content: "what was happening?", timestamp: 5 }); + const after = SessionManager.open(sessionFile!); + const afterTexts = after.buildSessionContext().messages.map((message) => messageText(message)); + expect(after.buildSessionContext().messages.map((message) => message.role)).toEqual([ + "compactionSummary", + "user", + ]); + expect(afterTexts.join("\n")).not.toContain("detailed new answer"); + }); + + it("is a no-op when the latest leaf is not a compaction entry", async () => { + const dir = await makeTmpDir(); + const session = SessionManager.create(dir, dir); + session.appendMessage({ role: "user", content: "hello", timestamp: 1 }); + session.appendMessage({ + role: "assistant", + content: [{ type: "text", text: "hi" }], + timestamp: 2, + }); + const sessionFile = session.getSessionFile(); + expect(sessionFile).toBeTruthy(); + + const result = await hardenManualCompactionBoundary({ sessionFile: sessionFile! }); + expect(result.applied).toBe(false); + expect(result.messages.map((message) => message.role)).toEqual(["user", "assistant"]); + }); +}); diff --git a/src/agents/pi-embedded-runner/manual-compaction-boundary.ts b/src/agents/pi-embedded-runner/manual-compaction-boundary.ts new file mode 100644 index 00000000000..2fc8ef5f1eb --- /dev/null +++ b/src/agents/pi-embedded-runner/manual-compaction-boundary.ts @@ -0,0 +1,103 @@ +import fs from "node:fs/promises"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; + +type SessionManagerLike = ReturnType; +type SessionEntry = ReturnType[number]; +type SessionHeader = NonNullable>; +type CompactionEntry = Extract; + +export type HardenedManualCompactionBoundary = { + applied: boolean; + firstKeptEntryId?: string; + leafId?: string; + messages: AgentMessage[]; +}; + +function serializeSessionFile(header: SessionHeader, entries: SessionEntry[]): string { + return ( + [JSON.stringify(header), ...entries.map((entry) => JSON.stringify(entry))].join("\n") + "\n" + ); +} + +function replaceLatestCompactionBoundary(params: { + entries: SessionEntry[]; + compactionEntryId: string; +}): SessionEntry[] { + return params.entries.map((entry) => { + if (entry.type !== "compaction" || entry.id !== params.compactionEntryId) { + return entry; + } + return { + ...entry, + // Manual /compact is an explicit checkpoint request, so make the + // rebuilt context start from the summary itself instead of preserving + // an upstream "recent tail" that can keep large prior turns alive. + firstKeptEntryId: entry.id, + } satisfies CompactionEntry; + }); +} + +export async function hardenManualCompactionBoundary(params: { + sessionFile: string; +}): Promise { + const sessionManager = SessionManager.open(params.sessionFile) as Partial; + if ( + typeof sessionManager.getHeader !== "function" || + typeof sessionManager.getLeafEntry !== "function" || + typeof sessionManager.buildSessionContext !== "function" || + typeof sessionManager.getEntries !== "function" + ) { + return { + applied: false, + messages: [], + }; + } + + const header = sessionManager.getHeader(); + const leaf = sessionManager.getLeafEntry(); + if (!header || leaf?.type !== "compaction") { + const sessionContext = sessionManager.buildSessionContext(); + return { + applied: false, + leafId: + typeof sessionManager.getLeafId === "function" + ? (sessionManager.getLeafId() ?? undefined) + : undefined, + messages: sessionContext.messages, + }; + } + + if (leaf.firstKeptEntryId === leaf.id) { + const sessionContext = sessionManager.buildSessionContext(); + return { + applied: false, + firstKeptEntryId: leaf.id, + leafId: + typeof sessionManager.getLeafId === "function" + ? (sessionManager.getLeafId() ?? undefined) + : undefined, + messages: sessionContext.messages, + }; + } + + const content = serializeSessionFile( + header, + replaceLatestCompactionBoundary({ + entries: sessionManager.getEntries(), + compactionEntryId: leaf.id, + }), + ); + const tmpFile = `${params.sessionFile}.manual-compaction-tmp`; + await fs.writeFile(tmpFile, content, "utf-8"); + await fs.rename(tmpFile, params.sessionFile); + + const refreshed = SessionManager.open(params.sessionFile); + const sessionContext = refreshed.buildSessionContext(); + return { + applied: true, + firstKeptEntryId: leaf.id, + leafId: refreshed.getLeafId() ?? undefined, + messages: sessionContext.messages, + }; +} diff --git a/ui/src/ui/controllers/sessions.test.ts b/ui/src/ui/controllers/sessions.test.ts index 5b8a21da957..bfed20ce153 100644 --- a/ui/src/ui/controllers/sessions.test.ts +++ b/ui/src/ui/controllers/sessions.test.ts @@ -1,5 +1,10 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { deleteSessionsAndRefresh, subscribeSessions, type SessionsState } from "./sessions.ts"; +import { + deleteSessionsAndRefresh, + loadSessions, + subscribeSessions, + type SessionsState, +} from "./sessions.ts"; type RequestFn = (method: string, params?: unknown) => Promise; @@ -125,3 +130,91 @@ describe("deleteSessionsAndRefresh", () => { expect(request).not.toHaveBeenCalled(); }); }); + +describe("loadSessions", () => { + it("refreshes expanded checkpoint cards when the row summary changes", async () => { + const request = vi.fn(async (method: string) => { + if (method === "sessions.list") { + return { + ts: 1, + path: "(multiple)", + count: 1, + defaults: {}, + sessions: [ + { + key: "agent:main:main", + kind: "direct", + updatedAt: 1, + compactionCheckpointCount: 1, + latestCompactionCheckpoint: { + checkpointId: "checkpoint-new", + createdAt: 20, + }, + }, + ], + }; + } + if (method === "sessions.compaction.list") { + return { + ok: true, + key: "agent:main:main", + checkpoints: [ + { + checkpointId: "checkpoint-new", + sessionKey: "agent:main:main", + sessionId: "session-1", + createdAt: 20, + reason: "manual", + }, + ], + }; + } + throw new Error(`unexpected method: ${method}`); + }); + const state = createState(request, { + sessionsExpandedCheckpointKey: "agent:main:main", + sessionsResult: { + ts: 0, + path: "(multiple)", + count: 1, + defaults: {}, + sessions: [ + { + key: "agent:main:main", + kind: "direct", + updatedAt: 0, + compactionCheckpointCount: 3, + latestCompactionCheckpoint: { + checkpointId: "checkpoint-old", + createdAt: 10, + }, + }, + ], + } as never, + sessionsCheckpointItemsByKey: { + "agent:main:main": [ + { + checkpointId: "checkpoint-old", + sessionKey: "agent:main:main", + sessionId: "session-old", + createdAt: 10, + reason: "manual", + }, + ] as never, + }, + }); + + await loadSessions(state); + + expect(request).toHaveBeenNthCalledWith(1, "sessions.list", { + includeGlobal: true, + includeUnknown: true, + }); + expect(request).toHaveBeenNthCalledWith(2, "sessions.compaction.list", { + key: "agent:main:main", + }); + expect( + state.sessionsCheckpointItemsByKey["agent:main:main"]?.map((item) => item.checkpointId), + ).toEqual(["checkpoint-new"]); + }); +}); diff --git a/ui/src/ui/controllers/sessions.ts b/ui/src/ui/controllers/sessions.ts index a435630b507..d800579a85f 100644 --- a/ui/src/ui/controllers/sessions.ts +++ b/ui/src/ui/controllers/sessions.ts @@ -29,6 +29,67 @@ export type SessionsState = { sessionsCheckpointErrorByKey: Record; }; +function checkpointSignature( + row: + | { + key: string; + compactionCheckpointCount?: number; + latestCompactionCheckpoint?: { checkpointId?: string; createdAt?: number } | null; + } + | undefined, +): string { + return JSON.stringify({ + key: row?.key ?? "", + count: row?.compactionCheckpointCount ?? 0, + latestCheckpointId: row?.latestCompactionCheckpoint?.checkpointId ?? "", + latestCreatedAt: row?.latestCompactionCheckpoint?.createdAt ?? 0, + }); +} + +function invalidateCheckpointCacheForKey(state: SessionsState, key: string) { + if ( + !(key in state.sessionsCheckpointItemsByKey) && + !(key in state.sessionsCheckpointErrorByKey) + ) { + return; + } + const nextItems = { ...state.sessionsCheckpointItemsByKey }; + const nextErrors = { ...state.sessionsCheckpointErrorByKey }; + delete nextItems[key]; + delete nextErrors[key]; + state.sessionsCheckpointItemsByKey = nextItems; + state.sessionsCheckpointErrorByKey = nextErrors; +} + +async function fetchSessionCompactionCheckpoints(state: SessionsState, key: string) { + state.sessionsCheckpointLoadingKey = key; + state.sessionsCheckpointErrorByKey = { + ...state.sessionsCheckpointErrorByKey, + [key]: "", + }; + try { + const result = await state.client?.request( + "sessions.compaction.list", + { key }, + ); + if (result) { + state.sessionsCheckpointItemsByKey = { + ...state.sessionsCheckpointItemsByKey, + [key]: result.checkpoints ?? [], + }; + } + } catch (err) { + state.sessionsCheckpointErrorByKey = { + ...state.sessionsCheckpointErrorByKey, + [key]: String(err), + }; + } finally { + if (state.sessionsCheckpointLoadingKey === key) { + state.sessionsCheckpointLoadingKey = null; + } + } +} + export async function subscribeSessions(state: SessionsState) { if (!state.client || !state.connected) { return; @@ -58,6 +119,9 @@ export async function loadSessions( state.sessionsLoading = true; state.sessionsError = null; try { + const previousRows = new Map( + (state.sessionsResult?.sessions ?? []).map((row) => [row.key, row] as const), + ); const includeGlobal = overrides?.includeGlobal ?? state.sessionsIncludeGlobal; const includeUnknown = overrides?.includeUnknown ?? state.sessionsIncludeUnknown; const activeMinutes = overrides?.activeMinutes ?? toNumber(state.sessionsFilterActive, 0); @@ -75,6 +139,30 @@ export async function loadSessions( const res = await state.client.request("sessions.list", params); if (res) { state.sessionsResult = res; + const nextKeys = new Set(res.sessions.map((row) => row.key)); + for (const key of Object.keys(state.sessionsCheckpointItemsByKey)) { + if (!nextKeys.has(key)) { + invalidateCheckpointCacheForKey(state, key); + } + } + let expandedNeedsRefetch = false; + for (const row of res.sessions) { + const previous = previousRows.get(row.key); + if (checkpointSignature(previous) !== checkpointSignature(row)) { + invalidateCheckpointCacheForKey(state, row.key); + if (state.sessionsExpandedCheckpointKey === row.key) { + expandedNeedsRefetch = true; + } + } + } + const expandedKey = state.sessionsExpandedCheckpointKey; + if ( + expandedKey && + nextKeys.has(expandedKey) && + (expandedNeedsRefetch || !state.sessionsCheckpointItemsByKey[expandedKey]) + ) { + await fetchSessionCompactionCheckpoints(state, expandedKey); + } } } catch (err) { if (isMissingOperatorReadScopeError(err)) { @@ -181,32 +269,7 @@ export async function toggleSessionCompactionCheckpoints(state: SessionsState, k if (state.sessionsCheckpointItemsByKey[trimmedKey]) { return; } - state.sessionsCheckpointLoadingKey = trimmedKey; - state.sessionsCheckpointErrorByKey = { - ...state.sessionsCheckpointErrorByKey, - [trimmedKey]: "", - }; - try { - const result = await state.client?.request( - "sessions.compaction.list", - { key: trimmedKey }, - ); - if (result) { - state.sessionsCheckpointItemsByKey = { - ...state.sessionsCheckpointItemsByKey, - [trimmedKey]: result.checkpoints ?? [], - }; - } - } catch (err) { - state.sessionsCheckpointErrorByKey = { - ...state.sessionsCheckpointErrorByKey, - [trimmedKey]: String(err), - }; - } finally { - if (state.sessionsCheckpointLoadingKey === trimmedKey) { - state.sessionsCheckpointLoadingKey = null; - } - } + await fetchSessionCompactionCheckpoints(state, trimmedKey); } export async function branchSessionFromCheckpoint(