diff --git a/src/auto-reply/reply/session-fork.runtime.test.ts b/src/auto-reply/reply/session-fork.runtime.test.ts index 6bf4e75cd44..a1254dda20b 100644 --- a/src/auto-reply/reply/session-fork.runtime.test.ts +++ b/src/auto-reply/reply/session-fork.runtime.test.ts @@ -3,7 +3,10 @@ import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; import type { SessionEntry } from "../../config/sessions/types.js"; -import { resolveParentForkTokenCountRuntime } from "./session-fork.runtime.js"; +import { + forkSessionFromParentRuntime, + resolveParentForkTokenCountRuntime, +} from "./session-fork.runtime.js"; const roots: string[] = []; @@ -71,3 +74,136 @@ describe("resolveParentForkTokenCountRuntime", () => { expect(tokens).toBeGreaterThan(100_000); }); }); + +describe("forkSessionFromParentRuntime", () => { + it("forks the active branch without synchronously opening the session manager", async () => { + const root = await makeRoot("openclaw-parent-fork-"); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir); + const parentSessionFile = path.join(sessionsDir, "parent.jsonl"); + const cwd = path.join(root, "workspace"); + await fs.mkdir(cwd); + const parentSessionId = "parent-session"; + const lines = [ + { + type: "session", + version: 3, + id: parentSessionId, + timestamp: "2026-05-01T00:00:00.000Z", + cwd, + }, + { + type: "message", + id: "user-1", + parentId: null, + timestamp: "2026-05-01T00:00:01.000Z", + message: { role: "user", content: "hello" }, + }, + { + type: "message", + id: "assistant-1", + parentId: "user-1", + timestamp: "2026-05-01T00:00:02.000Z", + message: { + role: "assistant", + content: [{ type: "text", text: "hi" }], + api: "openai-responses", + provider: "openai", + model: "gpt-5.4", + stopReason: "stop", + timestamp: 2, + }, + }, + { + type: "label", + id: "label-1", + parentId: "assistant-1", + timestamp: "2026-05-01T00:00:03.000Z", + targetId: "user-1", + label: "start", + }, + ]; + await fs.writeFile( + parentSessionFile, + `${lines.map((entry) => JSON.stringify(entry)).join("\n")}\n`, + "utf-8", + ); + + const fork = await forkSessionFromParentRuntime({ + parentEntry: { + sessionId: parentSessionId, + sessionFile: parentSessionFile, + updatedAt: Date.now(), + }, + agentId: "main", + sessionsDir, + }); + + expect(fork).not.toBeNull(); + expect(fork?.sessionFile).toContain(sessionsDir); + expect(fork?.sessionId).not.toBe(parentSessionId); + const raw = await fs.readFile(fork?.sessionFile ?? "", "utf-8"); + const forkedEntries = raw + .trim() + .split(/\r?\n/u) + .map((line) => JSON.parse(line) as Record); + const resolvedParentSessionFile = await fs.realpath(parentSessionFile); + expect(forkedEntries[0]).toMatchObject({ + type: "session", + id: fork?.sessionId, + cwd, + parentSession: resolvedParentSessionFile, + }); + expect(forkedEntries.map((entry) => entry.type)).toEqual([ + "session", + "message", + "message", + "label", + ]); + expect(forkedEntries.at(-1)).toMatchObject({ + type: "label", + targetId: "user-1", + label: "start", + }); + }); + + it("creates a header-only child when the parent has no entries", async () => { + const root = await makeRoot("openclaw-parent-fork-empty-"); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir); + const parentSessionFile = path.join(sessionsDir, "parent.jsonl"); + const parentSessionId = "parent-empty"; + await fs.writeFile( + parentSessionFile, + `${JSON.stringify({ + type: "session", + version: 3, + id: parentSessionId, + timestamp: "2026-05-01T00:00:00.000Z", + cwd: root, + })}\n`, + "utf-8", + ); + + const fork = await forkSessionFromParentRuntime({ + parentEntry: { + sessionId: parentSessionId, + sessionFile: parentSessionFile, + updatedAt: Date.now(), + }, + agentId: "main", + sessionsDir, + }); + + expect(fork).not.toBeNull(); + const raw = await fs.readFile(fork?.sessionFile ?? "", "utf-8"); + const lines = raw.trim().split(/\r?\n/u); + expect(lines).toHaveLength(1); + const resolvedParentSessionFile = await fs.realpath(parentSessionFile); + expect(JSON.parse(lines[0] ?? "{}")).toMatchObject({ + type: "session", + id: fork?.sessionId, + parentSession: resolvedParentSessionFile, + }); + }); +}); diff --git a/src/auto-reply/reply/session-fork.runtime.ts b/src/auto-reply/reply/session-fork.runtime.ts index 732c4fede9c..ef278cee4f0 100644 --- a/src/auto-reply/reply/session-fork.runtime.ts +++ b/src/auto-reply/reply/session-fork.runtime.ts @@ -1,13 +1,32 @@ import crypto from "node:crypto"; -import fs from "node:fs"; +import fs from "node:fs/promises"; import path from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; +import { + CURRENT_SESSION_VERSION, + migrateSessionEntries, + parseSessionEntries, + type FileEntry, + type SessionEntry as PiSessionEntry, + type SessionHeader, +} from "@mariozechner/pi-coding-agent"; +import { v7 as uuidv7 } from "uuid"; import { estimateMessagesTokens } from "../../agents/compaction.js"; import { resolveSessionFilePath } from "../../config/sessions/paths.js"; -import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions/types.js"; +import { + resolveFreshSessionTotalTokens, + type SessionEntry as StoreSessionEntry, +} from "../../config/sessions/types.js"; import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js"; +type ForkSourceTranscript = { + cwd: string; + sessionDir: string; + leafId: string | null; + branchEntries: PiSessionEntry[]; + labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }>; +}; + function resolvePositiveTokenCount(value: number | undefined): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 ? Math.floor(value) @@ -15,7 +34,7 @@ function resolvePositiveTokenCount(value: number | undefined): number | undefine } export async function resolveParentForkTokenCountRuntime(params: { - parentEntry: SessionEntry; + parentEntry: StoreSessionEntry; storePath: string; }): Promise { const freshPersistedTokens = resolveFreshSessionTotalTokens(params.parentEntry); @@ -45,47 +64,216 @@ export async function resolveParentForkTokenCountRuntime(params: { return resolvePositiveTokenCount(params.parentEntry.totalTokens); } -export function forkSessionFromParentRuntime(params: { - parentEntry: SessionEntry; +function isSessionEntry(entry: FileEntry): entry is PiSessionEntry { + return ( + entry.type !== "session" && + typeof (entry as { id?: unknown }).id === "string" && + (typeof (entry as { timestamp?: unknown }).timestamp === "string" || + typeof (entry as { timestamp?: unknown }).timestamp === "number") + ); +} + +function buildEntryIndex(entries: PiSessionEntry[]): Map { + return new Map(entries.map((entry) => [entry.id, entry])); +} + +function readBranch(params: { + byId: Map; + leafId: string | null; +}): PiSessionEntry[] { + const branchEntries: PiSessionEntry[] = []; + let current = params.leafId ? params.byId.get(params.leafId) : undefined; + while (current) { + branchEntries.unshift(current); + current = current.parentId ? params.byId.get(current.parentId) : undefined; + } + return branchEntries; +} + +function generateEntryId(existingIds: Set): string { + for (let attempt = 0; attempt < 100; attempt += 1) { + const id = crypto.randomUUID().slice(0, 8); + if (!existingIds.has(id)) { + existingIds.add(id); + return id; + } + } + const id = crypto.randomUUID(); + existingIds.add(id); + return id; +} + +function collectBranchLabels(params: { + allEntries: PiSessionEntry[]; + pathEntryIds: Set; +}): Array<{ targetId: string; label: string; timestamp: string }> { + const labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }> = []; + for (const entry of params.allEntries) { + if ( + entry.type === "label" && + entry.label && + params.pathEntryIds.has(entry.targetId) && + typeof entry.timestamp === "string" + ) { + labelsToWrite.push({ + targetId: entry.targetId, + label: entry.label, + timestamp: entry.timestamp, + }); + } + } + return labelsToWrite; +} + +async function readForkSourceTranscript( + parentSessionFile: string, +): Promise { + const raw = await fs.readFile(parentSessionFile, "utf-8"); + const fileEntries = parseSessionEntries(raw); + migrateSessionEntries(fileEntries); + const header = + fileEntries.find((entry): entry is SessionHeader => entry.type === "session") ?? null; + const entries = fileEntries.filter(isSessionEntry); + const byId = buildEntryIndex(entries); + const leafId = entries.at(-1)?.id ?? null; + const branchEntries = readBranch({ byId, leafId }); + const pathEntryIds = new Set( + branchEntries.filter((entry) => entry.type !== "label").map((entry) => entry.id), + ); + return { + cwd: header?.cwd ?? process.cwd(), + sessionDir: path.dirname(parentSessionFile), + leafId, + branchEntries, + labelsToWrite: collectBranchLabels({ allEntries: entries, pathEntryIds }), + }; +} + +function buildBranchLabelEntries(params: { + labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }>; + pathEntryIds: Set; + lastEntryId: string | null; +}): PiSessionEntry[] { + let parentId = params.lastEntryId; + const labelEntries: PiSessionEntry[] = []; + for (const { targetId, label, timestamp } of params.labelsToWrite) { + const labelEntry = { + type: "label", + id: generateEntryId(params.pathEntryIds), + parentId, + timestamp, + targetId, + label, + } satisfies PiSessionEntry; + params.pathEntryIds.add(labelEntry.id); + labelEntries.push(labelEntry); + parentId = labelEntry.id; + } + return labelEntries; +} + +async function writeForkHeaderOnly(params: { + parentSessionFile: string; + sessionDir: string; + cwd: string; +}): Promise<{ sessionId: string; sessionFile: string }> { + const sessionId = uuidv7(); + const timestamp = new Date().toISOString(); + const fileTimestamp = timestamp.replace(/[:.]/g, "-"); + const sessionFile = path.join(params.sessionDir, `${fileTimestamp}_${sessionId}.jsonl`); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: sessionId, + timestamp, + cwd: params.cwd, + parentSession: params.parentSessionFile, + } satisfies SessionHeader; + await fs.mkdir(path.dirname(sessionFile), { recursive: true }); + await fs.writeFile(sessionFile, `${JSON.stringify(header)}\n`, { + encoding: "utf-8", + mode: 0o600, + flag: "wx", + }); + return { sessionId, sessionFile }; +} + +async function writeBranchedSession(params: { + parentSessionFile: string; + source: ForkSourceTranscript; +}): Promise<{ sessionId: string; sessionFile: string }> { + const sessionId = uuidv7(); + const timestamp = new Date().toISOString(); + const fileTimestamp = timestamp.replace(/[:.]/g, "-"); + const sessionFile = path.join(params.source.sessionDir, `${fileTimestamp}_${sessionId}.jsonl`); + const pathWithoutLabels = params.source.branchEntries.filter((entry) => entry.type !== "label"); + const pathEntryIds = new Set(pathWithoutLabels.map((entry) => entry.id)); + const labelEntries = buildBranchLabelEntries({ + labelsToWrite: params.source.labelsToWrite, + pathEntryIds, + lastEntryId: pathWithoutLabels.at(-1)?.id ?? null, + }); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: sessionId, + timestamp, + cwd: params.source.cwd, + parentSession: params.parentSessionFile, + } satisfies SessionHeader; + const entries = [header, ...pathWithoutLabels, ...labelEntries]; + const hasAssistant = entries.some( + (entry) => entry.type === "message" && entry.message.role === "assistant", + ); + if (hasAssistant) { + await fs.mkdir(path.dirname(sessionFile), { recursive: true }); + await fs.writeFile( + sessionFile, + `${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`, + { + encoding: "utf-8", + mode: 0o600, + flag: "wx", + }, + ); + } + return { sessionId, sessionFile }; +} + +async function fileExists(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + return stat.isFile(); + } catch { + return false; + } +} + +export async function forkSessionFromParentRuntime(params: { + parentEntry: StoreSessionEntry; agentId: string; sessionsDir: string; -}): { sessionId: string; sessionFile: string } | null { +}): Promise<{ sessionId: string; sessionFile: string } | null> { const parentSessionFile = resolveSessionFilePath( params.parentEntry.sessionId, params.parentEntry, { agentId: params.agentId, sessionsDir: params.sessionsDir }, ); - if (!parentSessionFile || !fs.existsSync(parentSessionFile)) { + if (!parentSessionFile || !(await fileExists(parentSessionFile))) { return null; } try { - const manager = SessionManager.open(parentSessionFile); - const leafId = manager.getLeafId(); - if (leafId) { - const sessionFile = manager.createBranchedSession(leafId) ?? manager.getSessionFile(); - const sessionId = manager.getSessionId(); - if (sessionFile && sessionId) { - return { sessionId, sessionFile }; - } + const source = await readForkSourceTranscript(parentSessionFile); + if (!source) { + return null; } - const sessionId = crypto.randomUUID(); - const timestamp = new Date().toISOString(); - const fileTimestamp = timestamp.replace(/[:.]/g, "-"); - const sessionFile = path.join(manager.getSessionDir(), `${fileTimestamp}_${sessionId}.jsonl`); - const header = { - type: "session", - version: CURRENT_SESSION_VERSION, - id: sessionId, - timestamp, - cwd: manager.getCwd(), - parentSession: parentSessionFile, - }; - fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, { - encoding: "utf-8", - mode: 0o600, - flag: "wx", - }); - return { sessionId, sessionFile }; + return source.leafId + ? await writeBranchedSession({ parentSessionFile, source }) + : await writeForkHeaderOnly({ + parentSessionFile, + sessionDir: source.sessionDir, + cwd: source.cwd, + }); } catch { return null; }