Gateway: harden manual compaction checkpoints

This commit is contained in:
scoootscooob
2026-04-06 16:39:43 -07:00
parent 66a281c4f9
commit 4af9bf751f
5 changed files with 422 additions and 31 deletions

View File

@@ -115,6 +115,7 @@ import { applyExtraParamsToAgent } from "./extra-params.js";
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js";
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-discovery-input.js";
import { readPiModelContextTokens } from "./model-context-tokens.js";
import { buildModelAliasLines, resolveModelAsync } from "./model.js";
@@ -973,6 +974,28 @@ export async function compactEmbeddedPiSessionDirect(
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
let effectiveFirstKeptEntryId = result.firstKeptEntryId;
let postCompactionLeafId =
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined;
if (params.trigger === "manual") {
try {
const hardenedBoundary = await hardenManualCompactionBoundary({
sessionFile: params.sessionFile,
});
if (hardenedBoundary.applied) {
effectiveFirstKeptEntryId =
hardenedBoundary.firstKeptEntryId ?? effectiveFirstKeptEntryId;
postCompactionLeafId = hardenedBoundary.leafId ?? postCompactionLeafId;
session.agent.state.messages = hardenedBoundary.messages;
}
} catch (err) {
log.warn("[compaction] failed to harden manual compaction boundary", {
errorMessage: err instanceof Error ? err.message : String(err),
});
}
}
// Estimate tokens after compaction by summing token estimates for remaining messages
const tokensAfter = estimateTokensAfterCompaction({
messagesAfter: session.messages,
@@ -984,7 +1007,6 @@ export async function compactEmbeddedPiSessionDirect(
const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter);
if (params.config && params.sessionKey && checkpointSnapshot) {
try {
const postCompactionLeafId = sessionManager.getLeafId() ?? undefined;
const storedCheckpoint = await persistSessionCompactionCheckpoint({
cfg: params.config,
sessionKey: params.sessionKey,
@@ -994,7 +1016,7 @@ export async function compactEmbeddedPiSessionDirect(
}),
snapshot: checkpointSnapshot,
summary: result.summary,
firstKeptEntryId: result.firstKeptEntryId,
firstKeptEntryId: effectiveFirstKeptEntryId,
tokensBefore: observedTokenCount ?? result.tokensBefore,
tokensAfter,
postSessionFile: params.sessionFile,
@@ -1040,7 +1062,7 @@ export async function compactEmbeddedPiSessionDirect(
sessionFile: params.sessionFile,
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
tokensBefore: result.tokensBefore,
firstKeptEntryId: result.firstKeptEntryId,
firstKeptEntryId: effectiveFirstKeptEntryId,
});
// Truncate session file to remove compacted entries (#39953)
if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) {
@@ -1072,7 +1094,7 @@ export async function compactEmbeddedPiSessionDirect(
compacted: true,
result: {
summary: result.summary,
firstKeptEntryId: result.firstKeptEntryId,
firstKeptEntryId: effectiveFirstKeptEntryId,
tokensBefore: observedTokenCount ?? result.tokensBefore,
tokensAfter,
details: result.details,
@@ -1368,6 +1390,7 @@ export const __testing = {
containsRealConversationMessages,
estimateTokensAfterCompaction,
buildBeforeCompactionHookMetrics,
hardenManualCompactionBoundary,
runBeforeCompactionHooks,
runAfterCompactionHooks,
runPostCompactionSideEffects,

View File

@@ -0,0 +1,109 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { afterEach, describe, expect, it } from "vitest";
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
let tmpDir = "";
async function makeTmpDir(): Promise<string> {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "manual-compaction-boundary-"));
return tmpDir;
}
afterEach(async () => {
if (tmpDir) {
await fs.rm(tmpDir, { recursive: true, force: true }).catch(() => {});
tmpDir = "";
}
});
function messageText(message: { content?: unknown }): string {
const content = message.content;
if (typeof content === "string") {
return content;
}
if (!Array.isArray(content)) {
return "";
}
return content
.map((block) =>
block && typeof block === "object" && "text" in block && typeof block.text === "string"
? block.text
: "",
)
.filter(Boolean)
.join(" ");
}
describe("hardenManualCompactionBoundary", () => {
it("turns manual compaction into a true checkpoint for rebuilt context", async () => {
const dir = await makeTmpDir();
const session = SessionManager.create(dir, dir);
session.appendMessage({ role: "user", content: "old question", timestamp: 1 });
session.appendMessage({
role: "assistant",
content: [{ type: "text", text: "very long old answer" }],
timestamp: 2,
});
const firstKeepId = session.getBranch().at(-1)?.id;
expect(firstKeepId).toBeTruthy();
session.appendCompaction("old summary", firstKeepId!, 100);
session.appendMessage({ role: "user", content: "new question", timestamp: 3 });
session.appendMessage({
role: "assistant",
content: [{ type: "text", text: "detailed new answer that should be summarized away" }],
timestamp: 4,
});
const secondKeepId = session.getBranch().at(-1)?.id;
expect(secondKeepId).toBeTruthy();
const latestCompactionId = session.appendCompaction("fresh summary", secondKeepId!, 200);
const sessionFile = session.getSessionFile();
expect(sessionFile).toBeTruthy();
const before = SessionManager.open(sessionFile!);
const beforeTexts = before
.buildSessionContext()
.messages.map((message) => messageText(message));
expect(beforeTexts.join("\n")).toContain("detailed new answer");
const hardened = await hardenManualCompactionBoundary({ sessionFile: sessionFile! });
expect(hardened.applied).toBe(true);
expect(hardened.firstKeptEntryId).toBe(latestCompactionId);
expect(hardened.messages.map((message) => message.role)).toEqual(["compactionSummary"]);
const reopened = SessionManager.open(sessionFile!);
const latest = reopened.getLeafEntry();
expect(latest?.type).toBe("compaction");
expect(latest?.firstKeptEntryId).toBe(latestCompactionId);
reopened.appendMessage({ role: "user", content: "what was happening?", timestamp: 5 });
const after = SessionManager.open(sessionFile!);
const afterTexts = after.buildSessionContext().messages.map((message) => messageText(message));
expect(after.buildSessionContext().messages.map((message) => message.role)).toEqual([
"compactionSummary",
"user",
]);
expect(afterTexts.join("\n")).not.toContain("detailed new answer");
});
it("is a no-op when the latest leaf is not a compaction entry", async () => {
const dir = await makeTmpDir();
const session = SessionManager.create(dir, dir);
session.appendMessage({ role: "user", content: "hello", timestamp: 1 });
session.appendMessage({
role: "assistant",
content: [{ type: "text", text: "hi" }],
timestamp: 2,
});
const sessionFile = session.getSessionFile();
expect(sessionFile).toBeTruthy();
const result = await hardenManualCompactionBoundary({ sessionFile: sessionFile! });
expect(result.applied).toBe(false);
expect(result.messages.map((message) => message.role)).toEqual(["user", "assistant"]);
});
});

View File

@@ -0,0 +1,103 @@
import fs from "node:fs/promises";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
type SessionManagerLike = ReturnType<typeof SessionManager.open>;
type SessionEntry = ReturnType<SessionManagerLike["getEntries"]>[number];
type SessionHeader = NonNullable<ReturnType<SessionManagerLike["getHeader"]>>;
type CompactionEntry = Extract<SessionEntry, { type: "compaction" }>;
export type HardenedManualCompactionBoundary = {
applied: boolean;
firstKeptEntryId?: string;
leafId?: string;
messages: AgentMessage[];
};
function serializeSessionFile(header: SessionHeader, entries: SessionEntry[]): string {
return (
[JSON.stringify(header), ...entries.map((entry) => JSON.stringify(entry))].join("\n") + "\n"
);
}
function replaceLatestCompactionBoundary(params: {
entries: SessionEntry[];
compactionEntryId: string;
}): SessionEntry[] {
return params.entries.map((entry) => {
if (entry.type !== "compaction" || entry.id !== params.compactionEntryId) {
return entry;
}
return {
...entry,
// Manual /compact is an explicit checkpoint request, so make the
// rebuilt context start from the summary itself instead of preserving
// an upstream "recent tail" that can keep large prior turns alive.
firstKeptEntryId: entry.id,
} satisfies CompactionEntry;
});
}
export async function hardenManualCompactionBoundary(params: {
sessionFile: string;
}): Promise<HardenedManualCompactionBoundary> {
const sessionManager = SessionManager.open(params.sessionFile) as Partial<SessionManagerLike>;
if (
typeof sessionManager.getHeader !== "function" ||
typeof sessionManager.getLeafEntry !== "function" ||
typeof sessionManager.buildSessionContext !== "function" ||
typeof sessionManager.getEntries !== "function"
) {
return {
applied: false,
messages: [],
};
}
const header = sessionManager.getHeader();
const leaf = sessionManager.getLeafEntry();
if (!header || leaf?.type !== "compaction") {
const sessionContext = sessionManager.buildSessionContext();
return {
applied: false,
leafId:
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined,
messages: sessionContext.messages,
};
}
if (leaf.firstKeptEntryId === leaf.id) {
const sessionContext = sessionManager.buildSessionContext();
return {
applied: false,
firstKeptEntryId: leaf.id,
leafId:
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined,
messages: sessionContext.messages,
};
}
const content = serializeSessionFile(
header,
replaceLatestCompactionBoundary({
entries: sessionManager.getEntries(),
compactionEntryId: leaf.id,
}),
);
const tmpFile = `${params.sessionFile}.manual-compaction-tmp`;
await fs.writeFile(tmpFile, content, "utf-8");
await fs.rename(tmpFile, params.sessionFile);
const refreshed = SessionManager.open(params.sessionFile);
const sessionContext = refreshed.buildSessionContext();
return {
applied: true,
firstKeptEntryId: leaf.id,
leafId: refreshed.getLeafId() ?? undefined,
messages: sessionContext.messages,
};
}

View File

@@ -1,5 +1,10 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { deleteSessionsAndRefresh, subscribeSessions, type SessionsState } from "./sessions.ts";
import {
deleteSessionsAndRefresh,
loadSessions,
subscribeSessions,
type SessionsState,
} from "./sessions.ts";
type RequestFn = (method: string, params?: unknown) => Promise<unknown>;
@@ -125,3 +130,91 @@ describe("deleteSessionsAndRefresh", () => {
expect(request).not.toHaveBeenCalled();
});
});
describe("loadSessions", () => {
it("refreshes expanded checkpoint cards when the row summary changes", async () => {
const request = vi.fn(async (method: string) => {
if (method === "sessions.list") {
return {
ts: 1,
path: "(multiple)",
count: 1,
defaults: {},
sessions: [
{
key: "agent:main:main",
kind: "direct",
updatedAt: 1,
compactionCheckpointCount: 1,
latestCompactionCheckpoint: {
checkpointId: "checkpoint-new",
createdAt: 20,
},
},
],
};
}
if (method === "sessions.compaction.list") {
return {
ok: true,
key: "agent:main:main",
checkpoints: [
{
checkpointId: "checkpoint-new",
sessionKey: "agent:main:main",
sessionId: "session-1",
createdAt: 20,
reason: "manual",
},
],
};
}
throw new Error(`unexpected method: ${method}`);
});
const state = createState(request, {
sessionsExpandedCheckpointKey: "agent:main:main",
sessionsResult: {
ts: 0,
path: "(multiple)",
count: 1,
defaults: {},
sessions: [
{
key: "agent:main:main",
kind: "direct",
updatedAt: 0,
compactionCheckpointCount: 3,
latestCompactionCheckpoint: {
checkpointId: "checkpoint-old",
createdAt: 10,
},
},
],
} as never,
sessionsCheckpointItemsByKey: {
"agent:main:main": [
{
checkpointId: "checkpoint-old",
sessionKey: "agent:main:main",
sessionId: "session-old",
createdAt: 10,
reason: "manual",
},
] as never,
},
});
await loadSessions(state);
expect(request).toHaveBeenNthCalledWith(1, "sessions.list", {
includeGlobal: true,
includeUnknown: true,
});
expect(request).toHaveBeenNthCalledWith(2, "sessions.compaction.list", {
key: "agent:main:main",
});
expect(
state.sessionsCheckpointItemsByKey["agent:main:main"]?.map((item) => item.checkpointId),
).toEqual(["checkpoint-new"]);
});
});

View File

@@ -29,6 +29,67 @@ export type SessionsState = {
sessionsCheckpointErrorByKey: Record<string, string>;
};
function checkpointSignature(
row:
| {
key: string;
compactionCheckpointCount?: number;
latestCompactionCheckpoint?: { checkpointId?: string; createdAt?: number } | null;
}
| undefined,
): string {
return JSON.stringify({
key: row?.key ?? "",
count: row?.compactionCheckpointCount ?? 0,
latestCheckpointId: row?.latestCompactionCheckpoint?.checkpointId ?? "",
latestCreatedAt: row?.latestCompactionCheckpoint?.createdAt ?? 0,
});
}
function invalidateCheckpointCacheForKey(state: SessionsState, key: string) {
if (
!(key in state.sessionsCheckpointItemsByKey) &&
!(key in state.sessionsCheckpointErrorByKey)
) {
return;
}
const nextItems = { ...state.sessionsCheckpointItemsByKey };
const nextErrors = { ...state.sessionsCheckpointErrorByKey };
delete nextItems[key];
delete nextErrors[key];
state.sessionsCheckpointItemsByKey = nextItems;
state.sessionsCheckpointErrorByKey = nextErrors;
}
async function fetchSessionCompactionCheckpoints(state: SessionsState, key: string) {
state.sessionsCheckpointLoadingKey = key;
state.sessionsCheckpointErrorByKey = {
...state.sessionsCheckpointErrorByKey,
[key]: "",
};
try {
const result = await state.client?.request<SessionsCompactionListResult>(
"sessions.compaction.list",
{ key },
);
if (result) {
state.sessionsCheckpointItemsByKey = {
...state.sessionsCheckpointItemsByKey,
[key]: result.checkpoints ?? [],
};
}
} catch (err) {
state.sessionsCheckpointErrorByKey = {
...state.sessionsCheckpointErrorByKey,
[key]: String(err),
};
} finally {
if (state.sessionsCheckpointLoadingKey === key) {
state.sessionsCheckpointLoadingKey = null;
}
}
}
export async function subscribeSessions(state: SessionsState) {
if (!state.client || !state.connected) {
return;
@@ -58,6 +119,9 @@ export async function loadSessions(
state.sessionsLoading = true;
state.sessionsError = null;
try {
const previousRows = new Map(
(state.sessionsResult?.sessions ?? []).map((row) => [row.key, row] as const),
);
const includeGlobal = overrides?.includeGlobal ?? state.sessionsIncludeGlobal;
const includeUnknown = overrides?.includeUnknown ?? state.sessionsIncludeUnknown;
const activeMinutes = overrides?.activeMinutes ?? toNumber(state.sessionsFilterActive, 0);
@@ -75,6 +139,30 @@ export async function loadSessions(
const res = await state.client.request<SessionsListResult | undefined>("sessions.list", params);
if (res) {
state.sessionsResult = res;
const nextKeys = new Set(res.sessions.map((row) => row.key));
for (const key of Object.keys(state.sessionsCheckpointItemsByKey)) {
if (!nextKeys.has(key)) {
invalidateCheckpointCacheForKey(state, key);
}
}
let expandedNeedsRefetch = false;
for (const row of res.sessions) {
const previous = previousRows.get(row.key);
if (checkpointSignature(previous) !== checkpointSignature(row)) {
invalidateCheckpointCacheForKey(state, row.key);
if (state.sessionsExpandedCheckpointKey === row.key) {
expandedNeedsRefetch = true;
}
}
}
const expandedKey = state.sessionsExpandedCheckpointKey;
if (
expandedKey &&
nextKeys.has(expandedKey) &&
(expandedNeedsRefetch || !state.sessionsCheckpointItemsByKey[expandedKey])
) {
await fetchSessionCompactionCheckpoints(state, expandedKey);
}
}
} catch (err) {
if (isMissingOperatorReadScopeError(err)) {
@@ -181,32 +269,7 @@ export async function toggleSessionCompactionCheckpoints(state: SessionsState, k
if (state.sessionsCheckpointItemsByKey[trimmedKey]) {
return;
}
state.sessionsCheckpointLoadingKey = trimmedKey;
state.sessionsCheckpointErrorByKey = {
...state.sessionsCheckpointErrorByKey,
[trimmedKey]: "",
};
try {
const result = await state.client?.request<SessionsCompactionListResult>(
"sessions.compaction.list",
{ key: trimmedKey },
);
if (result) {
state.sessionsCheckpointItemsByKey = {
...state.sessionsCheckpointItemsByKey,
[trimmedKey]: result.checkpoints ?? [],
};
}
} catch (err) {
state.sessionsCheckpointErrorByKey = {
...state.sessionsCheckpointErrorByKey,
[trimmedKey]: String(err),
};
} finally {
if (state.sessionsCheckpointLoadingKey === trimmedKey) {
state.sessionsCheckpointLoadingKey = null;
}
}
await fetchSessionCompactionCheckpoints(state, trimmedKey);
}
export async function branchSessionFromCheckpoint(