perf: avoid session manager opens for transcript maintenance

This commit is contained in:
Peter Steinberger
2026-05-02 05:58:29 +01:00
parent d4bdd40c92
commit f7fe6ad55e
12 changed files with 712 additions and 99 deletions

View File

@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Feishu: preserve Feishu/Lark HTTP error bodies for message sends, media sends, and chat member lookups, so HTTP 400 failures include vendor code, message, log id, and troubleshooter details. Fixes #73860. Thanks @desksk.
- Agents/transcripts: avoid reopening large Pi transcript files through the synchronous session manager for maintenance rewrites, persisted tool-result truncation, manual compaction boundary hardening, and queued compaction rotation. Thanks @mariozechner.
- Telegram: inherit the process DNS result order for Bot API transport and downgrade recovered sticky IPv4 fallback promotions to debug logs, while keeping pinned-IP escalation warnings visible. Fixes #75904. Thanks @highfly-hi and @neeravmakwana.
- Web search/MiniMax: allow `MINIMAX_OAUTH_TOKEN` to satisfy MiniMax Search credentials, so OAuth-authorized MiniMax Token Plan setups do not need a separate web-search key. Fixes #65768. Thanks @kikibrian and @zhouhe-xydt.
- Providers/MiniMax: derive Coding Plan usage polling from the configured MiniMax base URL, so global setups no longer query the CN usage host. Fixes #65054. Thanks @sixone74 and @Yanhu007.

View File

@@ -1,4 +1,3 @@
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { ensureContextEnginesInitialized } from "../../context-engine/init.js";
import { resolveContextEngine } from "../../context-engine/registry.js";
import type { ContextEngineRuntimeContext } from "../../context-engine/types.js";
@@ -28,7 +27,7 @@ import {
resolveEmbeddedCompactionTarget,
} from "./compaction-runtime-context.js";
import {
rotateTranscriptAfterCompaction,
rotateTranscriptFileAfterCompaction,
shouldRotateCompactionTranscript,
} from "./compaction-successor-transcript.js";
import { runContextEngineMaintenance } from "./context-engine-maintenance.js";
@@ -177,8 +176,7 @@ export async function compactEmbeddedPiSession(
if (result.ok && result.compacted) {
if (shouldRotateCompactionTranscript(params.config) && !delegatedRotatedTranscript) {
try {
const rotation = await rotateTranscriptAfterCompaction({
sessionManager: SessionManager.open(params.sessionFile),
const rotation = await rotateTranscriptFileAfterCompaction({
sessionFile: params.sessionFile,
});
if (rotation.rotated) {

View File

@@ -153,6 +153,7 @@ import {
toSessionToolAllowlist,
} from "./tool-name-allowlist.js";
import { splitSdkTools } from "./tool-split.js";
import { readTranscriptFileState } from "./transcript-file-state.js";
import type { EmbeddedPiCompactResult } from "./types.js";
import { mapThinkingLevel } from "./utils.js";
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
@@ -1172,7 +1173,9 @@ async function compactEmbeddedPiSessionDirectOnce(
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined;
let transcriptRotationSessionManager = sessionManager;
let transcriptRotationSessionManager: Parameters<
typeof rotateTranscriptAfterCompaction
>[0]["sessionManager"] = sessionManager;
if (params.trigger === "manual") {
try {
const hardenedBoundary = await hardenManualCompactionBoundary({
@@ -1185,7 +1188,9 @@ async function compactEmbeddedPiSessionDirectOnce(
hardenedBoundary.firstKeptEntryId ?? effectiveFirstKeptEntryId;
postCompactionLeafId = hardenedBoundary.leafId ?? postCompactionLeafId;
session.agent.state.messages = hardenedBoundary.messages;
transcriptRotationSessionManager = SessionManager.open(params.sessionFile);
transcriptRotationSessionManager = await readTranscriptFileState(
params.sessionFile,
);
}
} catch (err) {
log.warn("[compaction] failed to harden manual compaction boundary", {

View File

@@ -2,10 +2,11 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js";
import {
rotateTranscriptAfterCompaction,
rotateTranscriptFileAfterCompaction,
shouldRotateCompactionTranscript,
} from "./compaction-successor-transcript.js";
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
@@ -54,6 +55,30 @@ function createCompactedSession(sessionDir: string): {
}
describe("rotateTranscriptAfterCompaction", () => {
it("can rotate a persisted transcript without opening a manager", async () => {
const dir = await createTmpDir();
const { sessionFile } = createCompactedSession(dir);
const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => {
throw new Error("SessionManager.open should not be used for file rotation");
});
const result = await rotateTranscriptFileAfterCompaction({
sessionFile,
now: () => new Date("2026-04-27T12:00:00.000Z"),
});
openSpy.mockRestore();
expect(result.rotated).toBe(true);
expect(result.sessionFile).toBeTruthy();
const successor = SessionManager.open(result.sessionFile!);
expect(successor.getHeader()).toMatchObject({
parentSession: sessionFile,
cwd: dir,
});
expect(successor.buildSessionContext().messages.length).toBeGreaterThan(0);
});
it("creates a compacted successor transcript and leaves the archive untouched", async () => {
const dir = await createTmpDir();
const { manager, sessionFile, firstKeptId, oldUserId } = createCompactedSession(dir);

View File

@@ -1,18 +1,21 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import {
CURRENT_SESSION_VERSION,
SessionManager,
type CompactionEntry,
type SessionEntry,
type SessionHeader,
} from "@mariozechner/pi-coding-agent";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { collectDuplicateUserMessageEntryIdsForCompaction } from "./compaction-duplicate-user-messages.js";
import {
readTranscriptFileState,
TranscriptFileState,
writeTranscriptFileAtomic,
} from "./transcript-file-state.js";
type ReadonlySessionManagerForRotation = Pick<
SessionManager,
TranscriptFileState,
"buildSessionContext" | "getBranch" | "getCwd" | "getEntries" | "getHeader"
>;
@@ -70,14 +73,8 @@ export async function rotateTranscriptAfterCompaction(params: {
cwd: params.sessionManager.getCwd(),
parentSession: sessionFile,
});
await writeSessionFileAtomic(successorFile, [header, ...successorEntries]);
try {
SessionManager.open(successorFile).buildSessionContext();
} catch (err) {
await fs.unlink(successorFile).catch(() => undefined);
throw err;
}
await writeTranscriptFileAtomic(successorFile, [header, ...successorEntries]);
new TranscriptFileState({ header, entries: successorEntries }).buildSessionContext();
return {
rotated: true,
@@ -89,6 +86,18 @@ export async function rotateTranscriptAfterCompaction(params: {
};
}
export async function rotateTranscriptFileAfterCompaction(params: {
sessionFile: string;
now?: () => Date;
}): Promise<CompactionTranscriptRotation> {
const state = await readTranscriptFileState(params.sessionFile);
return rotateTranscriptAfterCompaction({
sessionManager: state,
sessionFile: params.sessionFile,
...(params.now ? { now: params.now } : {}),
});
}
function findLatestCompactionIndex(entries: SessionEntry[]): number {
for (let index = entries.length - 1; index >= 0; index -= 1) {
if (entries[index]?.type === "compaction") {
@@ -263,20 +272,3 @@ function resolveSuccessorSessionFile(params: {
const fileTimestamp = params.timestamp.replace(/[:.]/g, "-");
return path.join(path.dirname(params.sessionFile), `${fileTimestamp}_${params.sessionId}.jsonl`);
}
async function writeSessionFileAtomic(
filePath: string,
entries: Array<SessionHeader | SessionEntry>,
) {
const dir = path.dirname(filePath);
await fs.mkdir(dir, { recursive: true });
const tmpFile = path.join(dir, `.${path.basename(filePath)}.${process.pid}.${randomUUID()}.tmp`);
const content = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`;
try {
await fs.writeFile(tmpFile, content, { encoding: "utf8", flag: "wx" });
await fs.rename(tmpFile, filePath);
} catch (err) {
await fs.unlink(tmpFile).catch(() => undefined);
throw err;
}
}

View File

@@ -4,7 +4,7 @@ import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
let tmpDir = "";
@@ -95,7 +95,11 @@ describe("hardenManualCompactionBoundary", () => {
.messages.map((message) => messageText(message));
expect(beforeTexts.join("\n")).toContain("detailed new answer");
const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => {
throw new Error("SessionManager.open should not be used for boundary hardening");
});
const hardened = await hardenManualCompactionBoundary({ sessionFile: sessionFile! });
openSpy.mockRestore();
expect(hardened.applied).toBe(true);
expect(hardened.firstKeptEntryId).toBe(latestCompactionId);
expect(hardened.messages.map((message) => message.role)).toEqual(["compactionSummary"]);

View File

@@ -1,10 +1,11 @@
import fs from "node:fs/promises";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { SessionEntry } from "@mariozechner/pi-coding-agent";
import {
readTranscriptFileState,
TranscriptFileState,
writeTranscriptFileAtomic,
} from "./transcript-file-state.js";
type SessionManagerLike = ReturnType<typeof SessionManager.open>;
type SessionEntry = ReturnType<SessionManagerLike["getEntries"]>[number];
type SessionHeader = NonNullable<ReturnType<SessionManagerLike["getHeader"]>>;
type CompactionEntry = Extract<SessionEntry, { type: "compaction" }>;
export type HardenedManualCompactionBoundary = {
@@ -14,12 +15,6 @@ export type HardenedManualCompactionBoundary = {
messages: AgentMessage[];
};
function serializeSessionFile(header: SessionHeader, entries: SessionEntry[]): string {
return (
[JSON.stringify(header), ...entries.map((entry) => JSON.stringify(entry))].join("\n") + "\n"
);
}
function replaceLatestCompactionBoundary(params: {
entries: SessionEntry[];
compactionEntryId: string;
@@ -42,76 +37,60 @@ export async function hardenManualCompactionBoundary(params: {
sessionFile: string;
preserveRecentTail?: boolean;
}): Promise<HardenedManualCompactionBoundary> {
const sessionManager = SessionManager.open(params.sessionFile) as Partial<SessionManagerLike>;
if (
typeof sessionManager.getHeader !== "function" ||
typeof sessionManager.getLeafEntry !== "function" ||
typeof sessionManager.buildSessionContext !== "function" ||
typeof sessionManager.getEntries !== "function"
) {
const state = await readTranscriptFileState(params.sessionFile);
const header = state.getHeader();
if (!header) {
return {
applied: false,
messages: [],
};
}
const header = sessionManager.getHeader();
const leaf = sessionManager.getLeafEntry();
if (!header || leaf?.type !== "compaction") {
const sessionContext = sessionManager.buildSessionContext();
const leaf = state.getLeafEntry();
if (leaf?.type !== "compaction") {
const sessionContext = state.buildSessionContext();
return {
applied: false,
leafId:
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined,
leafId: state.getLeafId() ?? undefined,
messages: sessionContext.messages,
};
}
if (params.preserveRecentTail) {
const sessionContext = sessionManager.buildSessionContext();
const sessionContext = state.buildSessionContext();
return {
applied: false,
firstKeptEntryId: leaf.firstKeptEntryId,
leafId:
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined,
leafId: state.getLeafId() ?? undefined,
messages: sessionContext.messages,
};
}
if (leaf.firstKeptEntryId === leaf.id) {
const sessionContext = sessionManager.buildSessionContext();
const sessionContext = state.buildSessionContext();
return {
applied: false,
firstKeptEntryId: leaf.id,
leafId:
typeof sessionManager.getLeafId === "function"
? (sessionManager.getLeafId() ?? undefined)
: undefined,
leafId: state.getLeafId() ?? undefined,
messages: sessionContext.messages,
};
}
const content = serializeSessionFile(
const replacedEntries = replaceLatestCompactionBoundary({
entries: state.getEntries(),
compactionEntryId: leaf.id,
});
const replacedState = new TranscriptFileState({
header,
replaceLatestCompactionBoundary({
entries: sessionManager.getEntries(),
compactionEntryId: leaf.id,
}),
);
const tmpFile = `${params.sessionFile}.manual-compaction-tmp`;
await fs.writeFile(tmpFile, content, "utf-8");
await fs.rename(tmpFile, params.sessionFile);
entries: replacedEntries,
});
await writeTranscriptFileAtomic(params.sessionFile, [header, ...replacedEntries]);
const refreshed = SessionManager.open(params.sessionFile);
const sessionContext = refreshed.buildSessionContext();
const sessionContext = replacedState.buildSessionContext();
return {
applied: true,
firstKeptEntryId: leaf.id,
leafId: refreshed.getLeafId() ?? undefined,
leafId: replacedState.getLeafId() ?? undefined,
messages: sessionContext.messages,
};
}

View File

@@ -4,7 +4,7 @@ import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, ToolResultMessage, UserMessage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js";
let truncateToolResultText: typeof import("./tool-result-truncation.js").truncateToolResultText;
@@ -441,10 +441,14 @@ describe("truncateOversizedToolResultsInSession", () => {
)
.filter((length) => length > 0);
const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => {
throw new Error("SessionManager.open should not be used for persisted truncation");
});
const result = await truncateOversizedToolResultsInSession({
sessionFile,
contextWindowTokens: 100,
});
openSpy.mockRestore();
expect(result.truncated).toBe(true);
expect(result.truncatedCount).toBeGreaterThan(0);

View File

@@ -9,7 +9,15 @@ import { resolveAgentContextLimits } from "../agent-scope.js";
import { acquireSessionWriteLock } from "../session-write-lock.js";
import { formatContextLimitTruncationNotice } from "./context-truncation-notice.js";
import { log } from "./logger.js";
import { rewriteTranscriptEntriesInSessionManager } from "./transcript-rewrite.js";
import {
persistTranscriptStateMutation,
readTranscriptFileState,
type TranscriptFileState,
} from "./transcript-file-state.js";
import {
rewriteTranscriptEntriesInSessionManager,
rewriteTranscriptEntriesInState,
} from "./transcript-rewrite.js";
/**
* Maximum share of the context window a single tool result should occupy.
@@ -664,6 +672,69 @@ function truncateOversizedToolResultsInExistingSessionManager(params: {
};
}
async function truncateOversizedToolResultsInTranscriptState(params: {
state: TranscriptFileState;
sessionFile: string;
contextWindowTokens: number;
maxCharsOverride?: number;
sessionId?: string;
sessionKey?: string;
}): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> {
const { state, contextWindowTokens } = params;
const maxChars = Math.max(
1,
params.maxCharsOverride ?? calculateMaxToolResultChars(contextWindowTokens),
);
const aggregateBudgetChars = calculateRecoveryAggregateToolResultChars(
contextWindowTokens,
maxChars,
);
const branch = state.getBranch() as ToolResultBranchEntry[];
if (branch.length === 0) {
return { truncated: false, truncatedCount: 0, reason: "empty session" };
}
const plan = buildToolResultReplacementPlan({
branch,
maxChars,
aggregateBudgetChars,
minKeepChars: RECOVERY_MIN_KEEP_CHARS,
});
if (plan.replacements.length === 0) {
return {
truncated: false,
truncatedCount: 0,
reason: "no oversized or aggregate tool results",
};
}
const rewriteResult = rewriteTranscriptEntriesInState({
state,
replacements: plan.replacements,
});
if (rewriteResult.changed) {
await persistTranscriptStateMutation({
sessionFile: params.sessionFile,
state,
appendedEntries: rewriteResult.appendedEntries,
});
emitSessionTranscriptUpdate(params.sessionFile);
}
log.info(
`[tool-result-truncation] Truncated ${rewriteResult.rewrittenEntries} tool result(s) in session ` +
`(contextWindow=${contextWindowTokens} maxChars=${maxChars} aggregateBudgetChars=${aggregateBudgetChars} ` +
`oversized=${plan.oversizedReplacementCount} aggregate=${plan.aggregateReplacementCount}) ` +
`sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`,
);
return {
truncated: rewriteResult.changed,
truncatedCount: rewriteResult.rewrittenEntries,
reason: rewriteResult.reason,
};
}
export function truncateOversizedToolResultsInSessionManager(params: {
sessionManager: SessionManager;
contextWindowTokens: number;
@@ -693,9 +764,9 @@ export async function truncateOversizedToolResultsInSession(params: {
try {
sessionLock = await acquireSessionWriteLock({ sessionFile });
const sessionManager = SessionManager.open(sessionFile);
return truncateOversizedToolResultsInExistingSessionManager({
sessionManager,
const state = await readTranscriptFileState(sessionFile);
return await truncateOversizedToolResultsInTranscriptState({
state,
contextWindowTokens,
maxCharsOverride: params.maxCharsOverride,
sessionFile,

View File

@@ -0,0 +1,339 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import {
buildSessionContext,
CURRENT_SESSION_VERSION,
migrateSessionEntries,
parseSessionEntries,
type FileEntry,
type SessionContext,
type SessionEntry,
type SessionHeader,
} from "@mariozechner/pi-coding-agent";
type BranchSummaryEntry = Extract<SessionEntry, { type: "branch_summary" }>;
type CompactionEntry = Extract<SessionEntry, { type: "compaction" }>;
type CustomEntry = Extract<SessionEntry, { type: "custom" }>;
type CustomMessageEntry = Extract<SessionEntry, { type: "custom_message" }>;
type LabelEntry = Extract<SessionEntry, { type: "label" }>;
type ModelChangeEntry = Extract<SessionEntry, { type: "model_change" }>;
type SessionInfoEntry = Extract<SessionEntry, { type: "session_info" }>;
type SessionMessageEntry = Extract<SessionEntry, { type: "message" }>;
type ThinkingLevelChangeEntry = Extract<SessionEntry, { type: "thinking_level_change" }>;
function isSessionEntry(entry: FileEntry): entry is SessionEntry {
return entry.type !== "session";
}
function sessionHeaderVersion(header: SessionHeader | null): number {
return typeof header?.version === "number" ? header.version : 1;
}
function generateEntryId(byId: { has(id: string): boolean }): string {
for (let attempt = 0; attempt < 100; attempt += 1) {
const id = randomUUID().slice(0, 8);
if (!byId.has(id)) {
return id;
}
}
return randomUUID();
}
function serializeTranscriptFileEntries(entries: FileEntry[]): string {
return `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`;
}
export class TranscriptFileState {
readonly header: SessionHeader | null;
readonly entries: SessionEntry[];
readonly migrated: boolean;
private readonly byId = new Map<string, SessionEntry>();
private readonly labelsById = new Map<string, string>();
private readonly labelTimestampsById = new Map<string, string>();
private leafId: string | null = null;
constructor(params: {
header: SessionHeader | null;
entries: SessionEntry[];
migrated?: boolean;
}) {
this.header = params.header;
this.entries = [...params.entries];
this.migrated = params.migrated === true;
this.rebuildIndex();
}
private rebuildIndex(): void {
this.byId.clear();
this.labelsById.clear();
this.labelTimestampsById.clear();
this.leafId = null;
for (const entry of this.entries) {
this.byId.set(entry.id, entry);
this.leafId = entry.id;
if (entry.type === "label") {
if (entry.label) {
this.labelsById.set(entry.targetId, entry.label);
this.labelTimestampsById.set(entry.targetId, entry.timestamp);
} else {
this.labelsById.delete(entry.targetId);
this.labelTimestampsById.delete(entry.targetId);
}
}
}
}
getCwd(): string {
return this.header?.cwd ?? process.cwd();
}
getHeader(): SessionHeader | null {
return this.header;
}
getEntries(): SessionEntry[] {
return [...this.entries];
}
getLeafId(): string | null {
return this.leafId;
}
getLeafEntry(): SessionEntry | undefined {
return this.leafId ? this.byId.get(this.leafId) : undefined;
}
getLabel(id: string): string | undefined {
return this.labelsById.get(id);
}
getBranch(fromId?: string): SessionEntry[] {
const branch: SessionEntry[] = [];
let current = (fromId ?? this.leafId) ? this.byId.get((fromId ?? this.leafId)!) : undefined;
while (current) {
branch.unshift(current);
current = current.parentId ? this.byId.get(current.parentId) : undefined;
}
return branch;
}
buildSessionContext(): SessionContext {
return buildSessionContext(this.entries, this.leafId, this.byId);
}
branch(branchFromId: string): void {
if (!this.byId.has(branchFromId)) {
throw new Error(`Entry ${branchFromId} not found`);
}
this.leafId = branchFromId;
}
resetLeaf(): void {
this.leafId = null;
}
appendMessage(message: SessionMessageEntry["message"]): SessionMessageEntry {
return this.appendEntry({
type: "message",
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
message,
});
}
appendThinkingLevelChange(thinkingLevel: string): ThinkingLevelChangeEntry {
return this.appendEntry({
type: "thinking_level_change",
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
thinkingLevel,
});
}
appendModelChange(provider: string, modelId: string): ModelChangeEntry {
return this.appendEntry({
type: "model_change",
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
provider,
modelId,
});
}
appendCompaction(
summary: string,
firstKeptEntryId: string,
tokensBefore: number,
details?: unknown,
fromHook?: boolean,
): CompactionEntry {
return this.appendEntry({
type: "compaction",
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
summary,
firstKeptEntryId,
tokensBefore,
details,
fromHook,
});
}
appendCustomEntry(customType: string, data?: unknown): CustomEntry {
return this.appendEntry({
type: "custom",
customType,
data,
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
});
}
appendSessionInfo(name: string): SessionInfoEntry {
return this.appendEntry({
type: "session_info",
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
name: name.trim(),
});
}
appendCustomMessageEntry(
customType: string,
content: CustomMessageEntry["content"],
display: boolean,
details?: unknown,
): CustomMessageEntry {
return this.appendEntry({
type: "custom_message",
customType,
content,
display,
details,
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
});
}
appendLabelChange(targetId: string, label: string | undefined): LabelEntry {
if (!this.byId.has(targetId)) {
throw new Error(`Entry ${targetId} not found`);
}
return this.appendEntry({
type: "label",
id: generateEntryId(this.byId),
parentId: this.leafId,
timestamp: new Date().toISOString(),
targetId,
label,
});
}
branchWithSummary(
branchFromId: string | null,
summary: string,
details?: unknown,
fromHook?: boolean,
): BranchSummaryEntry {
if (branchFromId !== null && !this.byId.has(branchFromId)) {
throw new Error(`Entry ${branchFromId} not found`);
}
this.leafId = branchFromId;
return this.appendEntry({
type: "branch_summary",
id: generateEntryId(this.byId),
parentId: branchFromId,
timestamp: new Date().toISOString(),
fromId: branchFromId ?? "root",
summary,
details,
fromHook,
});
}
private appendEntry<T extends SessionEntry>(entry: T): T {
this.entries.push(entry);
this.byId.set(entry.id, entry);
this.leafId = entry.id;
if (entry.type === "label") {
if (entry.label) {
this.labelsById.set(entry.targetId, entry.label);
this.labelTimestampsById.set(entry.targetId, entry.timestamp);
} else {
this.labelsById.delete(entry.targetId);
this.labelTimestampsById.delete(entry.targetId);
}
}
return entry;
}
}
export async function readTranscriptFileState(sessionFile: string): Promise<TranscriptFileState> {
const raw = await fs.readFile(sessionFile, "utf-8");
const fileEntries = parseSessionEntries(raw);
const headerBeforeMigration =
fileEntries.find((entry): entry is SessionHeader => entry.type === "session") ?? null;
const migrated = sessionHeaderVersion(headerBeforeMigration) < CURRENT_SESSION_VERSION;
migrateSessionEntries(fileEntries);
const header =
fileEntries.find((entry): entry is SessionHeader => entry.type === "session") ?? null;
const entries = fileEntries.filter(isSessionEntry);
return new TranscriptFileState({ header, entries, migrated });
}
export function serializeTranscriptState(state: TranscriptFileState): string {
return serializeTranscriptFileEntries([
...(state.header ? [state.header] : []),
...state.entries,
]);
}
export async function writeTranscriptFileAtomic(
filePath: string,
entries: Array<SessionHeader | SessionEntry>,
): Promise<void> {
const dir = path.dirname(filePath);
await fs.mkdir(dir, { recursive: true });
const tmpFile = path.join(dir, `.${path.basename(filePath)}.${process.pid}.${randomUUID()}.tmp`);
try {
await fs.writeFile(tmpFile, serializeTranscriptFileEntries(entries), {
encoding: "utf-8",
mode: 0o600,
flag: "wx",
});
await fs.rename(tmpFile, filePath);
} catch (err) {
await fs.unlink(tmpFile).catch(() => undefined);
throw err;
}
}
export async function persistTranscriptStateMutation(params: {
sessionFile: string;
state: TranscriptFileState;
appendedEntries: SessionEntry[];
}): Promise<void> {
if (params.appendedEntries.length === 0 && !params.state.migrated) {
return;
}
if (params.state.migrated) {
await writeTranscriptFileAtomic(params.sessionFile, [
...(params.state.header ? [params.state.header] : []),
...params.state.entries,
]);
return;
}
await fs.appendFile(
params.sessionFile,
params.appendedEntries.map((entry) => JSON.stringify(entry)).join("\n") + "\n",
"utf-8",
);
}

View File

@@ -1,3 +1,6 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
@@ -271,13 +274,39 @@ describe("rewriteTranscriptEntriesInSessionManager", () => {
});
describe("rewriteTranscriptEntriesInSessionFile", () => {
it("emits transcript updates when the active branch changes", async () => {
const sessionFile = "/tmp/session.jsonl";
const { sessionManager, toolResultEntryId } = createExecRewriteSession();
it("emits transcript updates when the active branch changes without opening a manager", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-transcript-rewrite-"));
const sessionManager = SessionManager.create(dir, dir);
const entryIds = appendSessionMessages(sessionManager, [
asAppendMessage({
role: "user",
content: "run tool",
timestamp: 1,
}),
asAppendMessage({
role: "toolResult",
toolCallId: "call_1",
toolName: "exec",
content: createTextContent("before rewrite"),
isError: false,
timestamp: 2,
}),
asAppendMessage({
role: "assistant",
content: createTextContent("summarized"),
timestamp: 3,
}),
]);
const sessionFile = sessionManager.getSessionFile();
expect(sessionFile).toBeTruthy();
if (!sessionFile) {
throw new Error("expected persisted session file");
}
const toolResultEntryId = entryIds[1];
const openSpy = vi
.spyOn(SessionManager, "open")
.mockReturnValue(sessionManager as unknown as ReturnType<typeof SessionManager.open>);
const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => {
throw new Error("SessionManager.open should not be used for file rewrites");
});
const listener = vi.fn();
const cleanup = onSessionTranscriptUpdate(listener);
@@ -302,7 +331,9 @@ describe("rewriteTranscriptEntriesInSessionFile", () => {
expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith({ sessionFile });
const rewrittenToolResult = getBranchMessages(sessionManager)[1] as Extract<
openSpy.mockRestore();
const rewrittenSession = SessionManager.open(sessionFile);
const rewrittenToolResult = getBranchMessages(rewrittenSession)[1] as Extract<
AgentMessage,
{ role: "toolResult" }
>;

View File

@@ -10,6 +10,11 @@ import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js
import { getRawSessionAppendMessage } from "../session-raw-append-message.js";
import { acquireSessionWriteLock } from "../session-write-lock.js";
import { log } from "./logger.js";
import {
persistTranscriptStateMutation,
readTranscriptFileState,
type TranscriptFileState,
} from "./transcript-file-state.js";
type SessionManagerLike = ReturnType<typeof SessionManager.open>;
type SessionBranchEntry = ReturnType<SessionManagerLike["getBranch"]>[number];
@@ -84,6 +89,58 @@ function appendBranchEntry(params: {
);
}
function appendTranscriptStateBranchEntry(params: {
state: TranscriptFileState;
entry: SessionBranchEntry;
rewrittenEntryIds: ReadonlyMap<string, string>;
}): SessionBranchEntry {
const { state, entry, rewrittenEntryIds } = params;
if (entry.type === "message") {
return state.appendMessage(entry.message);
}
if (entry.type === "compaction") {
return state.appendCompaction(
entry.summary,
remapEntryId(entry.firstKeptEntryId, rewrittenEntryIds) ?? entry.firstKeptEntryId,
entry.tokensBefore,
entry.details,
entry.fromHook,
);
}
if (entry.type === "thinking_level_change") {
return state.appendThinkingLevelChange(entry.thinkingLevel);
}
if (entry.type === "model_change") {
return state.appendModelChange(entry.provider, entry.modelId);
}
if (entry.type === "custom") {
return state.appendCustomEntry(entry.customType, entry.data);
}
if (entry.type === "custom_message") {
return state.appendCustomMessageEntry(
entry.customType,
entry.content,
entry.display,
entry.details,
);
}
if (entry.type === "session_info") {
return state.appendSessionInfo(entry.name ?? "");
}
if (entry.type === "branch_summary") {
return state.branchWithSummary(
remapEntryId(entry.parentId, rewrittenEntryIds),
entry.summary,
entry.details,
entry.fromHook,
);
}
return state.appendLabelChange(
remapEntryId(entry.targetId, rewrittenEntryIds) ?? entry.targetId,
entry.label,
);
}
/**
* Safely rewrites transcript message entries on the active branch by branching
* from the first rewritten message's parent and re-appending the suffix.
@@ -188,6 +245,108 @@ export function rewriteTranscriptEntriesInSessionManager(params: {
};
}
export function rewriteTranscriptEntriesInState(params: {
state: TranscriptFileState;
replacements: TranscriptRewriteReplacement[];
}): TranscriptRewriteResult & { appendedEntries: SessionBranchEntry[] } {
const replacementsById = new Map(
params.replacements
.filter((replacement) => replacement.entryId.trim().length > 0)
.map((replacement) => [replacement.entryId, replacement.message]),
);
if (replacementsById.size === 0) {
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
reason: "no replacements requested",
appendedEntries: [],
};
}
const branch = params.state.getBranch();
if (branch.length === 0) {
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
reason: "empty session",
appendedEntries: [],
};
}
const matchedIndices: number[] = [];
let bytesFreed = 0;
for (let index = 0; index < branch.length; index++) {
const entry = branch[index];
if (entry.type !== "message") {
continue;
}
const replacement = replacementsById.get(entry.id);
if (!replacement) {
continue;
}
const originalBytes = estimateMessageBytes(entry.message);
const replacementBytes = estimateMessageBytes(replacement);
matchedIndices.push(index);
bytesFreed += Math.max(0, originalBytes - replacementBytes);
}
if (matchedIndices.length === 0) {
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
reason: "no matching message entries",
appendedEntries: [],
};
}
const firstMatchedEntry = branch[matchedIndices[0]] as
| Extract<SessionBranchEntry, { type: "message" }>
| undefined;
if (!firstMatchedEntry) {
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
reason: "invalid first rewrite target",
appendedEntries: [],
};
}
if (!firstMatchedEntry.parentId) {
params.state.resetLeaf();
} else {
params.state.branch(firstMatchedEntry.parentId);
}
const appendedEntries: SessionBranchEntry[] = [];
const rewrittenEntryIds = new Map<string, string>();
for (let index = matchedIndices[0]; index < branch.length; index++) {
const entry = branch[index];
const replacement = entry.type === "message" ? replacementsById.get(entry.id) : undefined;
const newEntry =
replacement === undefined
? appendTranscriptStateBranchEntry({
state: params.state,
entry,
rewrittenEntryIds,
})
: params.state.appendMessage(replacement);
rewrittenEntryIds.set(entry.id, newEntry.id);
appendedEntries.push(newEntry);
}
return {
changed: true,
bytesFreed,
rewrittenEntries: matchedIndices.length,
appendedEntries,
};
}
/**
* Open a transcript file, rewrite message entries on the active branch, and
* emit a transcript update when the active branch changed.
@@ -203,12 +362,17 @@ export async function rewriteTranscriptEntriesInSessionFile(params: {
sessionLock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
});
const sessionManager = SessionManager.open(params.sessionFile);
const result = rewriteTranscriptEntriesInSessionManager({
sessionManager,
const state = await readTranscriptFileState(params.sessionFile);
const result = rewriteTranscriptEntriesInState({
state,
replacements: params.request.replacements,
});
if (result.changed) {
await persistTranscriptStateMutation({
sessionFile: params.sessionFile,
state,
appendedEntries: result.appendedEntries,
});
emitSessionTranscriptUpdate(params.sessionFile);
log.info(
`[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` +