refactor: fork parent sessions asynchronously

This commit is contained in:
Peter Steinberger
2026-05-02 05:08:36 +01:00
committed by GitHub
parent a7237ea44f
commit ee94d21f1f
2 changed files with 359 additions and 35 deletions

View File

@@ -3,7 +3,10 @@ import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import type { SessionEntry } from "../../config/sessions/types.js";
import { resolveParentForkTokenCountRuntime } from "./session-fork.runtime.js";
import {
forkSessionFromParentRuntime,
resolveParentForkTokenCountRuntime,
} from "./session-fork.runtime.js";
const roots: string[] = [];
@@ -71,3 +74,136 @@ describe("resolveParentForkTokenCountRuntime", () => {
expect(tokens).toBeGreaterThan(100_000);
});
});
describe("forkSessionFromParentRuntime", () => {
it("forks the active branch without synchronously opening the session manager", async () => {
const root = await makeRoot("openclaw-parent-fork-");
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir);
const parentSessionFile = path.join(sessionsDir, "parent.jsonl");
const cwd = path.join(root, "workspace");
await fs.mkdir(cwd);
const parentSessionId = "parent-session";
const lines = [
{
type: "session",
version: 3,
id: parentSessionId,
timestamp: "2026-05-01T00:00:00.000Z",
cwd,
},
{
type: "message",
id: "user-1",
parentId: null,
timestamp: "2026-05-01T00:00:01.000Z",
message: { role: "user", content: "hello" },
},
{
type: "message",
id: "assistant-1",
parentId: "user-1",
timestamp: "2026-05-01T00:00:02.000Z",
message: {
role: "assistant",
content: [{ type: "text", text: "hi" }],
api: "openai-responses",
provider: "openai",
model: "gpt-5.4",
stopReason: "stop",
timestamp: 2,
},
},
{
type: "label",
id: "label-1",
parentId: "assistant-1",
timestamp: "2026-05-01T00:00:03.000Z",
targetId: "user-1",
label: "start",
},
];
await fs.writeFile(
parentSessionFile,
`${lines.map((entry) => JSON.stringify(entry)).join("\n")}\n`,
"utf-8",
);
const fork = await forkSessionFromParentRuntime({
parentEntry: {
sessionId: parentSessionId,
sessionFile: parentSessionFile,
updatedAt: Date.now(),
},
agentId: "main",
sessionsDir,
});
expect(fork).not.toBeNull();
expect(fork?.sessionFile).toContain(sessionsDir);
expect(fork?.sessionId).not.toBe(parentSessionId);
const raw = await fs.readFile(fork?.sessionFile ?? "", "utf-8");
const forkedEntries = raw
.trim()
.split(/\r?\n/u)
.map((line) => JSON.parse(line) as Record<string, unknown>);
const resolvedParentSessionFile = await fs.realpath(parentSessionFile);
expect(forkedEntries[0]).toMatchObject({
type: "session",
id: fork?.sessionId,
cwd,
parentSession: resolvedParentSessionFile,
});
expect(forkedEntries.map((entry) => entry.type)).toEqual([
"session",
"message",
"message",
"label",
]);
expect(forkedEntries.at(-1)).toMatchObject({
type: "label",
targetId: "user-1",
label: "start",
});
});
it("creates a header-only child when the parent has no entries", async () => {
const root = await makeRoot("openclaw-parent-fork-empty-");
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir);
const parentSessionFile = path.join(sessionsDir, "parent.jsonl");
const parentSessionId = "parent-empty";
await fs.writeFile(
parentSessionFile,
`${JSON.stringify({
type: "session",
version: 3,
id: parentSessionId,
timestamp: "2026-05-01T00:00:00.000Z",
cwd: root,
})}\n`,
"utf-8",
);
const fork = await forkSessionFromParentRuntime({
parentEntry: {
sessionId: parentSessionId,
sessionFile: parentSessionFile,
updatedAt: Date.now(),
},
agentId: "main",
sessionsDir,
});
expect(fork).not.toBeNull();
const raw = await fs.readFile(fork?.sessionFile ?? "", "utf-8");
const lines = raw.trim().split(/\r?\n/u);
expect(lines).toHaveLength(1);
const resolvedParentSessionFile = await fs.realpath(parentSessionFile);
expect(JSON.parse(lines[0] ?? "{}")).toMatchObject({
type: "session",
id: fork?.sessionId,
parentSession: resolvedParentSessionFile,
});
});
});

View File

@@ -1,13 +1,32 @@
import crypto from "node:crypto";
import fs from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import {
CURRENT_SESSION_VERSION,
migrateSessionEntries,
parseSessionEntries,
type FileEntry,
type SessionEntry as PiSessionEntry,
type SessionHeader,
} from "@mariozechner/pi-coding-agent";
import { v7 as uuidv7 } from "uuid";
import { estimateMessagesTokens } from "../../agents/compaction.js";
import { resolveSessionFilePath } from "../../config/sessions/paths.js";
import { resolveFreshSessionTotalTokens, type SessionEntry } from "../../config/sessions/types.js";
import {
resolveFreshSessionTotalTokens,
type SessionEntry as StoreSessionEntry,
} from "../../config/sessions/types.js";
import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js";
type ForkSourceTranscript = {
cwd: string;
sessionDir: string;
leafId: string | null;
branchEntries: PiSessionEntry[];
labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }>;
};
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
? Math.floor(value)
@@ -15,7 +34,7 @@ function resolvePositiveTokenCount(value: number | undefined): number | undefine
}
export async function resolveParentForkTokenCountRuntime(params: {
parentEntry: SessionEntry;
parentEntry: StoreSessionEntry;
storePath: string;
}): Promise<number | undefined> {
const freshPersistedTokens = resolveFreshSessionTotalTokens(params.parentEntry);
@@ -45,47 +64,216 @@ export async function resolveParentForkTokenCountRuntime(params: {
return resolvePositiveTokenCount(params.parentEntry.totalTokens);
}
export function forkSessionFromParentRuntime(params: {
parentEntry: SessionEntry;
function isSessionEntry(entry: FileEntry): entry is PiSessionEntry {
return (
entry.type !== "session" &&
typeof (entry as { id?: unknown }).id === "string" &&
(typeof (entry as { timestamp?: unknown }).timestamp === "string" ||
typeof (entry as { timestamp?: unknown }).timestamp === "number")
);
}
function buildEntryIndex(entries: PiSessionEntry[]): Map<string, PiSessionEntry> {
return new Map(entries.map((entry) => [entry.id, entry]));
}
function readBranch(params: {
byId: Map<string, PiSessionEntry>;
leafId: string | null;
}): PiSessionEntry[] {
const branchEntries: PiSessionEntry[] = [];
let current = params.leafId ? params.byId.get(params.leafId) : undefined;
while (current) {
branchEntries.unshift(current);
current = current.parentId ? params.byId.get(current.parentId) : undefined;
}
return branchEntries;
}
function generateEntryId(existingIds: Set<string>): string {
for (let attempt = 0; attempt < 100; attempt += 1) {
const id = crypto.randomUUID().slice(0, 8);
if (!existingIds.has(id)) {
existingIds.add(id);
return id;
}
}
const id = crypto.randomUUID();
existingIds.add(id);
return id;
}
function collectBranchLabels(params: {
allEntries: PiSessionEntry[];
pathEntryIds: Set<string>;
}): Array<{ targetId: string; label: string; timestamp: string }> {
const labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }> = [];
for (const entry of params.allEntries) {
if (
entry.type === "label" &&
entry.label &&
params.pathEntryIds.has(entry.targetId) &&
typeof entry.timestamp === "string"
) {
labelsToWrite.push({
targetId: entry.targetId,
label: entry.label,
timestamp: entry.timestamp,
});
}
}
return labelsToWrite;
}
async function readForkSourceTranscript(
parentSessionFile: string,
): Promise<ForkSourceTranscript | null> {
const raw = await fs.readFile(parentSessionFile, "utf-8");
const fileEntries = parseSessionEntries(raw);
migrateSessionEntries(fileEntries);
const header =
fileEntries.find((entry): entry is SessionHeader => entry.type === "session") ?? null;
const entries = fileEntries.filter(isSessionEntry);
const byId = buildEntryIndex(entries);
const leafId = entries.at(-1)?.id ?? null;
const branchEntries = readBranch({ byId, leafId });
const pathEntryIds = new Set(
branchEntries.filter((entry) => entry.type !== "label").map((entry) => entry.id),
);
return {
cwd: header?.cwd ?? process.cwd(),
sessionDir: path.dirname(parentSessionFile),
leafId,
branchEntries,
labelsToWrite: collectBranchLabels({ allEntries: entries, pathEntryIds }),
};
}
function buildBranchLabelEntries(params: {
labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }>;
pathEntryIds: Set<string>;
lastEntryId: string | null;
}): PiSessionEntry[] {
let parentId = params.lastEntryId;
const labelEntries: PiSessionEntry[] = [];
for (const { targetId, label, timestamp } of params.labelsToWrite) {
const labelEntry = {
type: "label",
id: generateEntryId(params.pathEntryIds),
parentId,
timestamp,
targetId,
label,
} satisfies PiSessionEntry;
params.pathEntryIds.add(labelEntry.id);
labelEntries.push(labelEntry);
parentId = labelEntry.id;
}
return labelEntries;
}
async function writeForkHeaderOnly(params: {
parentSessionFile: string;
sessionDir: string;
cwd: string;
}): Promise<{ sessionId: string; sessionFile: string }> {
const sessionId = uuidv7();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(params.sessionDir, `${fileTimestamp}_${sessionId}.jsonl`);
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: params.cwd,
parentSession: params.parentSessionFile,
} satisfies SessionHeader;
await fs.mkdir(path.dirname(sessionFile), { recursive: true });
await fs.writeFile(sessionFile, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
mode: 0o600,
flag: "wx",
});
return { sessionId, sessionFile };
}
async function writeBranchedSession(params: {
parentSessionFile: string;
source: ForkSourceTranscript;
}): Promise<{ sessionId: string; sessionFile: string }> {
const sessionId = uuidv7();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(params.source.sessionDir, `${fileTimestamp}_${sessionId}.jsonl`);
const pathWithoutLabels = params.source.branchEntries.filter((entry) => entry.type !== "label");
const pathEntryIds = new Set(pathWithoutLabels.map((entry) => entry.id));
const labelEntries = buildBranchLabelEntries({
labelsToWrite: params.source.labelsToWrite,
pathEntryIds,
lastEntryId: pathWithoutLabels.at(-1)?.id ?? null,
});
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: params.source.cwd,
parentSession: params.parentSessionFile,
} satisfies SessionHeader;
const entries = [header, ...pathWithoutLabels, ...labelEntries];
const hasAssistant = entries.some(
(entry) => entry.type === "message" && entry.message.role === "assistant",
);
if (hasAssistant) {
await fs.mkdir(path.dirname(sessionFile), { recursive: true });
await fs.writeFile(
sessionFile,
`${entries.map((entry) => JSON.stringify(entry)).join("\n")}\n`,
{
encoding: "utf-8",
mode: 0o600,
flag: "wx",
},
);
}
return { sessionId, sessionFile };
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
export async function forkSessionFromParentRuntime(params: {
parentEntry: StoreSessionEntry;
agentId: string;
sessionsDir: string;
}): { sessionId: string; sessionFile: string } | null {
}): Promise<{ sessionId: string; sessionFile: string } | null> {
const parentSessionFile = resolveSessionFilePath(
params.parentEntry.sessionId,
params.parentEntry,
{ agentId: params.agentId, sessionsDir: params.sessionsDir },
);
if (!parentSessionFile || !fs.existsSync(parentSessionFile)) {
if (!parentSessionFile || !(await fileExists(parentSessionFile))) {
return null;
}
try {
const manager = SessionManager.open(parentSessionFile);
const leafId = manager.getLeafId();
if (leafId) {
const sessionFile = manager.createBranchedSession(leafId) ?? manager.getSessionFile();
const sessionId = manager.getSessionId();
if (sessionFile && sessionId) {
return { sessionId, sessionFile };
}
const source = await readForkSourceTranscript(parentSessionFile);
if (!source) {
return null;
}
const sessionId = crypto.randomUUID();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(manager.getSessionDir(), `${fileTimestamp}_${sessionId}.jsonl`);
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: manager.getCwd(),
parentSession: parentSessionFile,
};
fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, {
encoding: "utf-8",
mode: 0o600,
flag: "wx",
});
return { sessionId, sessionFile };
return source.leafId
? await writeBranchedSession({ parentSessionFile, source })
: await writeForkHeaderOnly({
parentSessionFile,
sessionDir: source.sessionDir,
cwd: source.cwd,
});
} catch {
return null;
}