diff --git a/CHANGELOG.md b/CHANGELOG.md index 408814fbffa..fc0c3567aab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Feishu: preserve Feishu/Lark HTTP error bodies for message sends, media sends, and chat member lookups, so HTTP 400 failures include vendor code, message, log id, and troubleshooter details. Fixes #73860. Thanks @desksk. +- Agents/transcripts: avoid reopening large Pi transcript files through the synchronous session manager for maintenance rewrites, persisted tool-result truncation, manual compaction boundary hardening, and queued compaction rotation. Thanks @mariozechner. - Telegram: inherit the process DNS result order for Bot API transport and downgrade recovered sticky IPv4 fallback promotions to debug logs, while keeping pinned-IP escalation warnings visible. Fixes #75904. Thanks @highfly-hi and @neeravmakwana. - Web search/MiniMax: allow `MINIMAX_OAUTH_TOKEN` to satisfy MiniMax Search credentials, so OAuth-authorized MiniMax Token Plan setups do not need a separate web-search key. Fixes #65768. Thanks @kikibrian and @zhouhe-xydt. - Providers/MiniMax: derive Coding Plan usage polling from the configured MiniMax base URL, so global setups no longer query the CN usage host. Fixes #65054. Thanks @sixone74 and @Yanhu007. diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index 81a4df0a1be..c35cc4adb45 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -1,4 +1,3 @@ -import { SessionManager } from "@mariozechner/pi-coding-agent"; import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; import { resolveContextEngine } from "../../context-engine/registry.js"; import type { ContextEngineRuntimeContext } from "../../context-engine/types.js"; @@ -28,7 +27,7 @@ import { resolveEmbeddedCompactionTarget, } from "./compaction-runtime-context.js"; import { - rotateTranscriptAfterCompaction, + rotateTranscriptFileAfterCompaction, shouldRotateCompactionTranscript, } from "./compaction-successor-transcript.js"; import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; @@ -177,8 +176,7 @@ export async function compactEmbeddedPiSession( if (result.ok && result.compacted) { if (shouldRotateCompactionTranscript(params.config) && !delegatedRotatedTranscript) { try { - const rotation = await rotateTranscriptAfterCompaction({ - sessionManager: SessionManager.open(params.sessionFile), + const rotation = await rotateTranscriptFileAfterCompaction({ sessionFile: params.sessionFile, }); if (rotation.rotated) { diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 6a077857cfa..c9290525666 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -153,6 +153,7 @@ import { toSessionToolAllowlist, } from "./tool-name-allowlist.js"; import { splitSdkTools } from "./tool-split.js"; +import { readTranscriptFileState } from "./transcript-file-state.js"; import type { EmbeddedPiCompactResult } from "./types.js"; import { mapThinkingLevel } from "./utils.js"; import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js"; @@ -1172,7 +1173,9 @@ async function compactEmbeddedPiSessionDirectOnce( typeof sessionManager.getLeafId === "function" ? (sessionManager.getLeafId() ?? undefined) : undefined; - let transcriptRotationSessionManager = sessionManager; + let transcriptRotationSessionManager: Parameters< + typeof rotateTranscriptAfterCompaction + >[0]["sessionManager"] = sessionManager; if (params.trigger === "manual") { try { const hardenedBoundary = await hardenManualCompactionBoundary({ @@ -1185,7 +1188,9 @@ async function compactEmbeddedPiSessionDirectOnce( hardenedBoundary.firstKeptEntryId ?? effectiveFirstKeptEntryId; postCompactionLeafId = hardenedBoundary.leafId ?? postCompactionLeafId; session.agent.state.messages = hardenedBoundary.messages; - transcriptRotationSessionManager = SessionManager.open(params.sessionFile); + transcriptRotationSessionManager = await readTranscriptFileState( + params.sessionFile, + ); } } catch (err) { log.warn("[compaction] failed to harden manual compaction boundary", { diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts index 94524628e80..da70a204f53 100644 --- a/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.test.ts @@ -2,10 +2,11 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; import { rotateTranscriptAfterCompaction, + rotateTranscriptFileAfterCompaction, shouldRotateCompactionTranscript, } from "./compaction-successor-transcript.js"; import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js"; @@ -54,6 +55,30 @@ function createCompactedSession(sessionDir: string): { } describe("rotateTranscriptAfterCompaction", () => { + it("can rotate a persisted transcript without opening a manager", async () => { + const dir = await createTmpDir(); + const { sessionFile } = createCompactedSession(dir); + + const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => { + throw new Error("SessionManager.open should not be used for file rotation"); + }); + const result = await rotateTranscriptFileAfterCompaction({ + sessionFile, + now: () => new Date("2026-04-27T12:00:00.000Z"), + }); + openSpy.mockRestore(); + + expect(result.rotated).toBe(true); + expect(result.sessionFile).toBeTruthy(); + + const successor = SessionManager.open(result.sessionFile!); + expect(successor.getHeader()).toMatchObject({ + parentSession: sessionFile, + cwd: dir, + }); + expect(successor.buildSessionContext().messages.length).toBeGreaterThan(0); + }); + it("creates a compacted successor transcript and leaves the archive untouched", async () => { const dir = await createTmpDir(); const { manager, sessionFile, firstKeptId, oldUserId } = createCompactedSession(dir); diff --git a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts index 878e4567599..7d73fde215a 100644 --- a/src/agents/pi-embedded-runner/compaction-successor-transcript.ts +++ b/src/agents/pi-embedded-runner/compaction-successor-transcript.ts @@ -1,18 +1,21 @@ import { randomUUID } from "node:crypto"; -import fs from "node:fs/promises"; import path from "node:path"; import { CURRENT_SESSION_VERSION, - SessionManager, type CompactionEntry, type SessionEntry, type SessionHeader, } from "@mariozechner/pi-coding-agent"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { collectDuplicateUserMessageEntryIdsForCompaction } from "./compaction-duplicate-user-messages.js"; +import { + readTranscriptFileState, + TranscriptFileState, + writeTranscriptFileAtomic, +} from "./transcript-file-state.js"; type ReadonlySessionManagerForRotation = Pick< - SessionManager, + TranscriptFileState, "buildSessionContext" | "getBranch" | "getCwd" | "getEntries" | "getHeader" >; @@ -70,14 +73,8 @@ export async function rotateTranscriptAfterCompaction(params: { cwd: params.sessionManager.getCwd(), parentSession: sessionFile, }); - await writeSessionFileAtomic(successorFile, [header, ...successorEntries]); - - try { - SessionManager.open(successorFile).buildSessionContext(); - } catch (err) { - await fs.unlink(successorFile).catch(() => undefined); - throw err; - } + await writeTranscriptFileAtomic(successorFile, [header, ...successorEntries]); + new TranscriptFileState({ header, entries: successorEntries }).buildSessionContext(); return { rotated: true, @@ -89,6 +86,18 @@ export async function rotateTranscriptAfterCompaction(params: { }; } +export async function rotateTranscriptFileAfterCompaction(params: { + sessionFile: string; + now?: () => Date; +}): Promise { + const state = await readTranscriptFileState(params.sessionFile); + return rotateTranscriptAfterCompaction({ + sessionManager: state, + sessionFile: params.sessionFile, + ...(params.now ? { now: params.now } : {}), + }); +} + function findLatestCompactionIndex(entries: SessionEntry[]): number { for (let index = entries.length - 1; index >= 0; index -= 1) { if (entries[index]?.type === "compaction") { @@ -263,20 +272,3 @@ function resolveSuccessorSessionFile(params: { const fileTimestamp = params.timestamp.replace(/[:.]/g, "-"); return path.join(path.dirname(params.sessionFile), `${fileTimestamp}_${params.sessionId}.jsonl`); } - -async function writeSessionFileAtomic( - filePath: string, - entries: Array, -) { - const dir = path.dirname(filePath); - await fs.mkdir(dir, { recursive: true }); - const tmpFile = path.join(dir, `.${path.basename(filePath)}.${process.pid}.${randomUUID()}.tmp`); - const content = `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`; - try { - await fs.writeFile(tmpFile, content, { encoding: "utf8", flag: "wx" }); - await fs.rename(tmpFile, filePath); - } catch (err) { - await fs.unlink(tmpFile).catch(() => undefined); - throw err; - } -} diff --git a/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts b/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts index 85867b31e14..d5a606cc8de 100644 --- a/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts +++ b/src/agents/pi-embedded-runner/manual-compaction-boundary.test.ts @@ -4,7 +4,7 @@ import path from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { afterEach, describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js"; let tmpDir = ""; @@ -95,7 +95,11 @@ describe("hardenManualCompactionBoundary", () => { .messages.map((message) => messageText(message)); expect(beforeTexts.join("\n")).toContain("detailed new answer"); + const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => { + throw new Error("SessionManager.open should not be used for boundary hardening"); + }); const hardened = await hardenManualCompactionBoundary({ sessionFile: sessionFile! }); + openSpy.mockRestore(); expect(hardened.applied).toBe(true); expect(hardened.firstKeptEntryId).toBe(latestCompactionId); expect(hardened.messages.map((message) => message.role)).toEqual(["compactionSummary"]); diff --git a/src/agents/pi-embedded-runner/manual-compaction-boundary.ts b/src/agents/pi-embedded-runner/manual-compaction-boundary.ts index 3c0e26c14f7..c615d877c67 100644 --- a/src/agents/pi-embedded-runner/manual-compaction-boundary.ts +++ b/src/agents/pi-embedded-runner/manual-compaction-boundary.ts @@ -1,10 +1,11 @@ -import fs from "node:fs/promises"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { SessionEntry } from "@mariozechner/pi-coding-agent"; +import { + readTranscriptFileState, + TranscriptFileState, + writeTranscriptFileAtomic, +} from "./transcript-file-state.js"; -type SessionManagerLike = ReturnType; -type SessionEntry = ReturnType[number]; -type SessionHeader = NonNullable>; type CompactionEntry = Extract; export type HardenedManualCompactionBoundary = { @@ -14,12 +15,6 @@ export type HardenedManualCompactionBoundary = { messages: AgentMessage[]; }; -function serializeSessionFile(header: SessionHeader, entries: SessionEntry[]): string { - return ( - [JSON.stringify(header), ...entries.map((entry) => JSON.stringify(entry))].join("\n") + "\n" - ); -} - function replaceLatestCompactionBoundary(params: { entries: SessionEntry[]; compactionEntryId: string; @@ -42,76 +37,60 @@ export async function hardenManualCompactionBoundary(params: { sessionFile: string; preserveRecentTail?: boolean; }): Promise { - const sessionManager = SessionManager.open(params.sessionFile) as Partial; - if ( - typeof sessionManager.getHeader !== "function" || - typeof sessionManager.getLeafEntry !== "function" || - typeof sessionManager.buildSessionContext !== "function" || - typeof sessionManager.getEntries !== "function" - ) { + const state = await readTranscriptFileState(params.sessionFile); + const header = state.getHeader(); + if (!header) { return { applied: false, messages: [], }; } - const header = sessionManager.getHeader(); - const leaf = sessionManager.getLeafEntry(); - if (!header || leaf?.type !== "compaction") { - const sessionContext = sessionManager.buildSessionContext(); + const leaf = state.getLeafEntry(); + if (leaf?.type !== "compaction") { + const sessionContext = state.buildSessionContext(); return { applied: false, - leafId: - typeof sessionManager.getLeafId === "function" - ? (sessionManager.getLeafId() ?? undefined) - : undefined, + leafId: state.getLeafId() ?? undefined, messages: sessionContext.messages, }; } if (params.preserveRecentTail) { - const sessionContext = sessionManager.buildSessionContext(); + const sessionContext = state.buildSessionContext(); return { applied: false, firstKeptEntryId: leaf.firstKeptEntryId, - leafId: - typeof sessionManager.getLeafId === "function" - ? (sessionManager.getLeafId() ?? undefined) - : undefined, + leafId: state.getLeafId() ?? undefined, messages: sessionContext.messages, }; } if (leaf.firstKeptEntryId === leaf.id) { - const sessionContext = sessionManager.buildSessionContext(); + const sessionContext = state.buildSessionContext(); return { applied: false, firstKeptEntryId: leaf.id, - leafId: - typeof sessionManager.getLeafId === "function" - ? (sessionManager.getLeafId() ?? undefined) - : undefined, + leafId: state.getLeafId() ?? undefined, messages: sessionContext.messages, }; } - const content = serializeSessionFile( + const replacedEntries = replaceLatestCompactionBoundary({ + entries: state.getEntries(), + compactionEntryId: leaf.id, + }); + const replacedState = new TranscriptFileState({ header, - replaceLatestCompactionBoundary({ - entries: sessionManager.getEntries(), - compactionEntryId: leaf.id, - }), - ); - const tmpFile = `${params.sessionFile}.manual-compaction-tmp`; - await fs.writeFile(tmpFile, content, "utf-8"); - await fs.rename(tmpFile, params.sessionFile); + entries: replacedEntries, + }); + await writeTranscriptFileAtomic(params.sessionFile, [header, ...replacedEntries]); - const refreshed = SessionManager.open(params.sessionFile); - const sessionContext = refreshed.buildSessionContext(); + const sessionContext = replacedState.buildSessionContext(); return { applied: true, firstKeptEntryId: leaf.id, - leafId: refreshed.getLeafId() ?? undefined, + leafId: replacedState.getLeafId() ?? undefined, messages: sessionContext.messages, }; } diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.test.ts b/src/agents/pi-embedded-runner/tool-result-truncation.test.ts index 16ca994fd39..750d4ba3774 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.test.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.test.ts @@ -4,7 +4,7 @@ import path from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { AssistantMessage, ToolResultMessage, UserMessage } from "@mariozechner/pi-ai"; import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { makeAgentAssistantMessage } from "../test-helpers/agent-message-fixtures.js"; let truncateToolResultText: typeof import("./tool-result-truncation.js").truncateToolResultText; @@ -441,10 +441,14 @@ describe("truncateOversizedToolResultsInSession", () => { ) .filter((length) => length > 0); + const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => { + throw new Error("SessionManager.open should not be used for persisted truncation"); + }); const result = await truncateOversizedToolResultsInSession({ sessionFile, contextWindowTokens: 100, }); + openSpy.mockRestore(); expect(result.truncated).toBe(true); expect(result.truncatedCount).toBeGreaterThan(0); diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.ts b/src/agents/pi-embedded-runner/tool-result-truncation.ts index c8fdc29382a..aec21afc018 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.ts @@ -9,7 +9,15 @@ import { resolveAgentContextLimits } from "../agent-scope.js"; import { acquireSessionWriteLock } from "../session-write-lock.js"; import { formatContextLimitTruncationNotice } from "./context-truncation-notice.js"; import { log } from "./logger.js"; -import { rewriteTranscriptEntriesInSessionManager } from "./transcript-rewrite.js"; +import { + persistTranscriptStateMutation, + readTranscriptFileState, + type TranscriptFileState, +} from "./transcript-file-state.js"; +import { + rewriteTranscriptEntriesInSessionManager, + rewriteTranscriptEntriesInState, +} from "./transcript-rewrite.js"; /** * Maximum share of the context window a single tool result should occupy. @@ -664,6 +672,69 @@ function truncateOversizedToolResultsInExistingSessionManager(params: { }; } +async function truncateOversizedToolResultsInTranscriptState(params: { + state: TranscriptFileState; + sessionFile: string; + contextWindowTokens: number; + maxCharsOverride?: number; + sessionId?: string; + sessionKey?: string; +}): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> { + const { state, contextWindowTokens } = params; + const maxChars = Math.max( + 1, + params.maxCharsOverride ?? calculateMaxToolResultChars(contextWindowTokens), + ); + const aggregateBudgetChars = calculateRecoveryAggregateToolResultChars( + contextWindowTokens, + maxChars, + ); + const branch = state.getBranch() as ToolResultBranchEntry[]; + + if (branch.length === 0) { + return { truncated: false, truncatedCount: 0, reason: "empty session" }; + } + + const plan = buildToolResultReplacementPlan({ + branch, + maxChars, + aggregateBudgetChars, + minKeepChars: RECOVERY_MIN_KEEP_CHARS, + }); + if (plan.replacements.length === 0) { + return { + truncated: false, + truncatedCount: 0, + reason: "no oversized or aggregate tool results", + }; + } + const rewriteResult = rewriteTranscriptEntriesInState({ + state, + replacements: plan.replacements, + }); + if (rewriteResult.changed) { + await persistTranscriptStateMutation({ + sessionFile: params.sessionFile, + state, + appendedEntries: rewriteResult.appendedEntries, + }); + emitSessionTranscriptUpdate(params.sessionFile); + } + + log.info( + `[tool-result-truncation] Truncated ${rewriteResult.rewrittenEntries} tool result(s) in session ` + + `(contextWindow=${contextWindowTokens} maxChars=${maxChars} aggregateBudgetChars=${aggregateBudgetChars} ` + + `oversized=${plan.oversizedReplacementCount} aggregate=${plan.aggregateReplacementCount}) ` + + `sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`, + ); + + return { + truncated: rewriteResult.changed, + truncatedCount: rewriteResult.rewrittenEntries, + reason: rewriteResult.reason, + }; +} + export function truncateOversizedToolResultsInSessionManager(params: { sessionManager: SessionManager; contextWindowTokens: number; @@ -693,9 +764,9 @@ export async function truncateOversizedToolResultsInSession(params: { try { sessionLock = await acquireSessionWriteLock({ sessionFile }); - const sessionManager = SessionManager.open(sessionFile); - return truncateOversizedToolResultsInExistingSessionManager({ - sessionManager, + const state = await readTranscriptFileState(sessionFile); + return await truncateOversizedToolResultsInTranscriptState({ + state, contextWindowTokens, maxCharsOverride: params.maxCharsOverride, sessionFile, diff --git a/src/agents/pi-embedded-runner/transcript-file-state.ts b/src/agents/pi-embedded-runner/transcript-file-state.ts new file mode 100644 index 00000000000..bdfa6d4f7c4 --- /dev/null +++ b/src/agents/pi-embedded-runner/transcript-file-state.ts @@ -0,0 +1,339 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { + buildSessionContext, + CURRENT_SESSION_VERSION, + migrateSessionEntries, + parseSessionEntries, + type FileEntry, + type SessionContext, + type SessionEntry, + type SessionHeader, +} from "@mariozechner/pi-coding-agent"; + +type BranchSummaryEntry = Extract; +type CompactionEntry = Extract; +type CustomEntry = Extract; +type CustomMessageEntry = Extract; +type LabelEntry = Extract; +type ModelChangeEntry = Extract; +type SessionInfoEntry = Extract; +type SessionMessageEntry = Extract; +type ThinkingLevelChangeEntry = Extract; + +function isSessionEntry(entry: FileEntry): entry is SessionEntry { + return entry.type !== "session"; +} + +function sessionHeaderVersion(header: SessionHeader | null): number { + return typeof header?.version === "number" ? header.version : 1; +} + +function generateEntryId(byId: { has(id: string): boolean }): string { + for (let attempt = 0; attempt < 100; attempt += 1) { + const id = randomUUID().slice(0, 8); + if (!byId.has(id)) { + return id; + } + } + return randomUUID(); +} + +function serializeTranscriptFileEntries(entries: FileEntry[]): string { + return `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`; +} + +export class TranscriptFileState { + readonly header: SessionHeader | null; + readonly entries: SessionEntry[]; + readonly migrated: boolean; + private readonly byId = new Map(); + private readonly labelsById = new Map(); + private readonly labelTimestampsById = new Map(); + private leafId: string | null = null; + + constructor(params: { + header: SessionHeader | null; + entries: SessionEntry[]; + migrated?: boolean; + }) { + this.header = params.header; + this.entries = [...params.entries]; + this.migrated = params.migrated === true; + this.rebuildIndex(); + } + + private rebuildIndex(): void { + this.byId.clear(); + this.labelsById.clear(); + this.labelTimestampsById.clear(); + this.leafId = null; + for (const entry of this.entries) { + this.byId.set(entry.id, entry); + this.leafId = entry.id; + if (entry.type === "label") { + if (entry.label) { + this.labelsById.set(entry.targetId, entry.label); + this.labelTimestampsById.set(entry.targetId, entry.timestamp); + } else { + this.labelsById.delete(entry.targetId); + this.labelTimestampsById.delete(entry.targetId); + } + } + } + } + + getCwd(): string { + return this.header?.cwd ?? process.cwd(); + } + + getHeader(): SessionHeader | null { + return this.header; + } + + getEntries(): SessionEntry[] { + return [...this.entries]; + } + + getLeafId(): string | null { + return this.leafId; + } + + getLeafEntry(): SessionEntry | undefined { + return this.leafId ? this.byId.get(this.leafId) : undefined; + } + + getLabel(id: string): string | undefined { + return this.labelsById.get(id); + } + + getBranch(fromId?: string): SessionEntry[] { + const branch: SessionEntry[] = []; + let current = (fromId ?? this.leafId) ? this.byId.get((fromId ?? this.leafId)!) : undefined; + while (current) { + branch.unshift(current); + current = current.parentId ? this.byId.get(current.parentId) : undefined; + } + return branch; + } + + buildSessionContext(): SessionContext { + return buildSessionContext(this.entries, this.leafId, this.byId); + } + + branch(branchFromId: string): void { + if (!this.byId.has(branchFromId)) { + throw new Error(`Entry ${branchFromId} not found`); + } + this.leafId = branchFromId; + } + + resetLeaf(): void { + this.leafId = null; + } + + appendMessage(message: SessionMessageEntry["message"]): SessionMessageEntry { + return this.appendEntry({ + type: "message", + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + message, + }); + } + + appendThinkingLevelChange(thinkingLevel: string): ThinkingLevelChangeEntry { + return this.appendEntry({ + type: "thinking_level_change", + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + thinkingLevel, + }); + } + + appendModelChange(provider: string, modelId: string): ModelChangeEntry { + return this.appendEntry({ + type: "model_change", + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + provider, + modelId, + }); + } + + appendCompaction( + summary: string, + firstKeptEntryId: string, + tokensBefore: number, + details?: unknown, + fromHook?: boolean, + ): CompactionEntry { + return this.appendEntry({ + type: "compaction", + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + summary, + firstKeptEntryId, + tokensBefore, + details, + fromHook, + }); + } + + appendCustomEntry(customType: string, data?: unknown): CustomEntry { + return this.appendEntry({ + type: "custom", + customType, + data, + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + }); + } + + appendSessionInfo(name: string): SessionInfoEntry { + return this.appendEntry({ + type: "session_info", + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + name: name.trim(), + }); + } + + appendCustomMessageEntry( + customType: string, + content: CustomMessageEntry["content"], + display: boolean, + details?: unknown, + ): CustomMessageEntry { + return this.appendEntry({ + type: "custom_message", + customType, + content, + display, + details, + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + }); + } + + appendLabelChange(targetId: string, label: string | undefined): LabelEntry { + if (!this.byId.has(targetId)) { + throw new Error(`Entry ${targetId} not found`); + } + return this.appendEntry({ + type: "label", + id: generateEntryId(this.byId), + parentId: this.leafId, + timestamp: new Date().toISOString(), + targetId, + label, + }); + } + + branchWithSummary( + branchFromId: string | null, + summary: string, + details?: unknown, + fromHook?: boolean, + ): BranchSummaryEntry { + if (branchFromId !== null && !this.byId.has(branchFromId)) { + throw new Error(`Entry ${branchFromId} not found`); + } + this.leafId = branchFromId; + return this.appendEntry({ + type: "branch_summary", + id: generateEntryId(this.byId), + parentId: branchFromId, + timestamp: new Date().toISOString(), + fromId: branchFromId ?? "root", + summary, + details, + fromHook, + }); + } + + private appendEntry(entry: T): T { + this.entries.push(entry); + this.byId.set(entry.id, entry); + this.leafId = entry.id; + if (entry.type === "label") { + if (entry.label) { + this.labelsById.set(entry.targetId, entry.label); + this.labelTimestampsById.set(entry.targetId, entry.timestamp); + } else { + this.labelsById.delete(entry.targetId); + this.labelTimestampsById.delete(entry.targetId); + } + } + return entry; + } +} + +export async function readTranscriptFileState(sessionFile: string): Promise { + const raw = await fs.readFile(sessionFile, "utf-8"); + const fileEntries = parseSessionEntries(raw); + const headerBeforeMigration = + fileEntries.find((entry): entry is SessionHeader => entry.type === "session") ?? null; + const migrated = sessionHeaderVersion(headerBeforeMigration) < CURRENT_SESSION_VERSION; + migrateSessionEntries(fileEntries); + const header = + fileEntries.find((entry): entry is SessionHeader => entry.type === "session") ?? null; + const entries = fileEntries.filter(isSessionEntry); + return new TranscriptFileState({ header, entries, migrated }); +} + +export function serializeTranscriptState(state: TranscriptFileState): string { + return serializeTranscriptFileEntries([ + ...(state.header ? [state.header] : []), + ...state.entries, + ]); +} + +export async function writeTranscriptFileAtomic( + filePath: string, + entries: Array, +): Promise { + const dir = path.dirname(filePath); + await fs.mkdir(dir, { recursive: true }); + const tmpFile = path.join(dir, `.${path.basename(filePath)}.${process.pid}.${randomUUID()}.tmp`); + try { + await fs.writeFile(tmpFile, serializeTranscriptFileEntries(entries), { + encoding: "utf-8", + mode: 0o600, + flag: "wx", + }); + await fs.rename(tmpFile, filePath); + } catch (err) { + await fs.unlink(tmpFile).catch(() => undefined); + throw err; + } +} + +export async function persistTranscriptStateMutation(params: { + sessionFile: string; + state: TranscriptFileState; + appendedEntries: SessionEntry[]; +}): Promise { + if (params.appendedEntries.length === 0 && !params.state.migrated) { + return; + } + if (params.state.migrated) { + await writeTranscriptFileAtomic(params.sessionFile, [ + ...(params.state.header ? [params.state.header] : []), + ...params.state.entries, + ]); + return; + } + await fs.appendFile( + params.sessionFile, + params.appendedEntries.map((entry) => JSON.stringify(entry)).join("\n") + "\n", + "utf-8", + ); +} diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts index 84cde33a15a..d1ff459375c 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts @@ -1,3 +1,6 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; @@ -271,13 +274,39 @@ describe("rewriteTranscriptEntriesInSessionManager", () => { }); describe("rewriteTranscriptEntriesInSessionFile", () => { - it("emits transcript updates when the active branch changes", async () => { - const sessionFile = "/tmp/session.jsonl"; - const { sessionManager, toolResultEntryId } = createExecRewriteSession(); + it("emits transcript updates when the active branch changes without opening a manager", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-transcript-rewrite-")); + const sessionManager = SessionManager.create(dir, dir); + const entryIds = appendSessionMessages(sessionManager, [ + asAppendMessage({ + role: "user", + content: "run tool", + timestamp: 1, + }), + asAppendMessage({ + role: "toolResult", + toolCallId: "call_1", + toolName: "exec", + content: createTextContent("before rewrite"), + isError: false, + timestamp: 2, + }), + asAppendMessage({ + role: "assistant", + content: createTextContent("summarized"), + timestamp: 3, + }), + ]); + const sessionFile = sessionManager.getSessionFile(); + expect(sessionFile).toBeTruthy(); + if (!sessionFile) { + throw new Error("expected persisted session file"); + } + const toolResultEntryId = entryIds[1]; - const openSpy = vi - .spyOn(SessionManager, "open") - .mockReturnValue(sessionManager as unknown as ReturnType); + const openSpy = vi.spyOn(SessionManager, "open").mockImplementation(() => { + throw new Error("SessionManager.open should not be used for file rewrites"); + }); const listener = vi.fn(); const cleanup = onSessionTranscriptUpdate(listener); @@ -302,7 +331,9 @@ describe("rewriteTranscriptEntriesInSessionFile", () => { expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledWith({ sessionFile }); - const rewrittenToolResult = getBranchMessages(sessionManager)[1] as Extract< + openSpy.mockRestore(); + const rewrittenSession = SessionManager.open(sessionFile); + const rewrittenToolResult = getBranchMessages(rewrittenSession)[1] as Extract< AgentMessage, { role: "toolResult" } >; diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.ts b/src/agents/pi-embedded-runner/transcript-rewrite.ts index 42eafc2b0af..011e711a1c8 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.ts @@ -10,6 +10,11 @@ import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js import { getRawSessionAppendMessage } from "../session-raw-append-message.js"; import { acquireSessionWriteLock } from "../session-write-lock.js"; import { log } from "./logger.js"; +import { + persistTranscriptStateMutation, + readTranscriptFileState, + type TranscriptFileState, +} from "./transcript-file-state.js"; type SessionManagerLike = ReturnType; type SessionBranchEntry = ReturnType[number]; @@ -84,6 +89,58 @@ function appendBranchEntry(params: { ); } +function appendTranscriptStateBranchEntry(params: { + state: TranscriptFileState; + entry: SessionBranchEntry; + rewrittenEntryIds: ReadonlyMap; +}): SessionBranchEntry { + const { state, entry, rewrittenEntryIds } = params; + if (entry.type === "message") { + return state.appendMessage(entry.message); + } + if (entry.type === "compaction") { + return state.appendCompaction( + entry.summary, + remapEntryId(entry.firstKeptEntryId, rewrittenEntryIds) ?? entry.firstKeptEntryId, + entry.tokensBefore, + entry.details, + entry.fromHook, + ); + } + if (entry.type === "thinking_level_change") { + return state.appendThinkingLevelChange(entry.thinkingLevel); + } + if (entry.type === "model_change") { + return state.appendModelChange(entry.provider, entry.modelId); + } + if (entry.type === "custom") { + return state.appendCustomEntry(entry.customType, entry.data); + } + if (entry.type === "custom_message") { + return state.appendCustomMessageEntry( + entry.customType, + entry.content, + entry.display, + entry.details, + ); + } + if (entry.type === "session_info") { + return state.appendSessionInfo(entry.name ?? ""); + } + if (entry.type === "branch_summary") { + return state.branchWithSummary( + remapEntryId(entry.parentId, rewrittenEntryIds), + entry.summary, + entry.details, + entry.fromHook, + ); + } + return state.appendLabelChange( + remapEntryId(entry.targetId, rewrittenEntryIds) ?? entry.targetId, + entry.label, + ); +} + /** * Safely rewrites transcript message entries on the active branch by branching * from the first rewritten message's parent and re-appending the suffix. @@ -188,6 +245,108 @@ export function rewriteTranscriptEntriesInSessionManager(params: { }; } +export function rewriteTranscriptEntriesInState(params: { + state: TranscriptFileState; + replacements: TranscriptRewriteReplacement[]; +}): TranscriptRewriteResult & { appendedEntries: SessionBranchEntry[] } { + const replacementsById = new Map( + params.replacements + .filter((replacement) => replacement.entryId.trim().length > 0) + .map((replacement) => [replacement.entryId, replacement.message]), + ); + if (replacementsById.size === 0) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "no replacements requested", + appendedEntries: [], + }; + } + + const branch = params.state.getBranch(); + if (branch.length === 0) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "empty session", + appendedEntries: [], + }; + } + + const matchedIndices: number[] = []; + let bytesFreed = 0; + + for (let index = 0; index < branch.length; index++) { + const entry = branch[index]; + if (entry.type !== "message") { + continue; + } + const replacement = replacementsById.get(entry.id); + if (!replacement) { + continue; + } + const originalBytes = estimateMessageBytes(entry.message); + const replacementBytes = estimateMessageBytes(replacement); + matchedIndices.push(index); + bytesFreed += Math.max(0, originalBytes - replacementBytes); + } + + if (matchedIndices.length === 0) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "no matching message entries", + appendedEntries: [], + }; + } + + const firstMatchedEntry = branch[matchedIndices[0]] as + | Extract + | undefined; + if (!firstMatchedEntry) { + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + reason: "invalid first rewrite target", + appendedEntries: [], + }; + } + + if (!firstMatchedEntry.parentId) { + params.state.resetLeaf(); + } else { + params.state.branch(firstMatchedEntry.parentId); + } + + const appendedEntries: SessionBranchEntry[] = []; + const rewrittenEntryIds = new Map(); + for (let index = matchedIndices[0]; index < branch.length; index++) { + const entry = branch[index]; + const replacement = entry.type === "message" ? replacementsById.get(entry.id) : undefined; + const newEntry = + replacement === undefined + ? appendTranscriptStateBranchEntry({ + state: params.state, + entry, + rewrittenEntryIds, + }) + : params.state.appendMessage(replacement); + rewrittenEntryIds.set(entry.id, newEntry.id); + appendedEntries.push(newEntry); + } + + return { + changed: true, + bytesFreed, + rewrittenEntries: matchedIndices.length, + appendedEntries, + }; +} + /** * Open a transcript file, rewrite message entries on the active branch, and * emit a transcript update when the active branch changed. @@ -203,12 +362,17 @@ export async function rewriteTranscriptEntriesInSessionFile(params: { sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, }); - const sessionManager = SessionManager.open(params.sessionFile); - const result = rewriteTranscriptEntriesInSessionManager({ - sessionManager, + const state = await readTranscriptFileState(params.sessionFile); + const result = rewriteTranscriptEntriesInState({ + state, replacements: params.request.replacements, }); if (result.changed) { + await persistTranscriptStateMutation({ + sessionFile: params.sessionFile, + state, + appendedEntries: result.appendedEntries, + }); emitSessionTranscriptUpdate(params.sessionFile); log.info( `[transcript-rewrite] rewrote ${result.rewrittenEntries} entr` +