refactor: parse session reads without manager

This commit is contained in:
Peter Steinberger
2026-05-02 05:12:02 +01:00
parent c76ee644c2
commit 7dc5b9484f
2 changed files with 108 additions and 202 deletions

View File

@@ -667,6 +667,51 @@ describe("readSessionMessages", () => {
}
});
test("honors byte caps for sync recent tree-message reads", () => {
const sessionId = "test-session-recent-tree-byte-cap";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
const hugeContent = "huge ".repeat(4096);
const lines = [
JSON.stringify({ type: "session", version: 3, id: sessionId }),
JSON.stringify({
type: "message",
id: "root",
parentId: null,
message: { role: "user", content: "root" },
}),
JSON.stringify({
type: "message",
id: "huge",
parentId: "root",
message: { role: "assistant", content: hugeContent },
}),
JSON.stringify({
type: "message",
id: "tail",
parentId: "huge",
message: { role: "assistant", content: "tail" },
}),
];
fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8");
const readFileSpy = vi.spyOn(fs, "readFileSync");
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
try {
const out = readRecentSessionMessages(sessionId, storePath, undefined, {
maxMessages: 2,
maxBytes: 2048,
});
expect(out).toEqual([expect.objectContaining({ role: "assistant", content: "tail" })]);
expect(JSON.stringify(out)).not.toContain("huge");
expect(readFileSpy).not.toHaveBeenCalled();
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
} finally {
readFileSpy.mockRestore();
sessionManagerOpenSpy.mockRestore();
}
});
test("counts transcript messages without loading the whole file", () => {
const sessionId = "test-session-count-large";
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
@@ -857,22 +902,28 @@ describe("readSessionMessages", () => {
const rawTranscript = fs.readFileSync(sessionFile, "utf-8");
expect(rawTranscript).toContain("original wrapped prompt");
expect(rawTranscript).toContain("clean prompt");
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
const out = readSessionMessages(sessionId, storePath, sessionFile);
expect(out).toHaveLength(2);
expect(out).toEqual([
expect.objectContaining({
role: "user",
content: "clean prompt",
__openclaw: expect.objectContaining({ seq: 1 }),
}),
expect.objectContaining({
role: "assistant",
content: [{ type: "text", text: "clean answer" }],
__openclaw: expect.objectContaining({ seq: 2 }),
}),
]);
expect(JSON.stringify(out)).not.toContain("original wrapped prompt");
try {
const out = readSessionMessages(sessionId, storePath, sessionFile);
expect(out).toHaveLength(2);
expect(out).toEqual([
expect.objectContaining({
role: "user",
content: "clean prompt",
__openclaw: expect.objectContaining({ seq: 1 }),
}),
expect.objectContaining({
role: "assistant",
content: [{ type: "text", text: "clean answer" }],
__openclaw: expect.objectContaining({ seq: 2 }),
}),
]);
expect(JSON.stringify(out)).not.toContain("original wrapped prompt");
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
} finally {
sessionManagerOpenSpy.mockRestore();
}
});
test.each([

View File

@@ -1,6 +1,5 @@
import fs from "node:fs";
import { StringDecoder } from "node:string_decoder";
import { SessionManager, type SessionEntry } from "@mariozechner/pi-coding-agent";
import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js";
import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js";
import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js";
@@ -151,69 +150,7 @@ export function readSessionMessages(
return [];
}
const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
const hasTreeEntries = lines.some(hasSessionTreeEntry);
let branchEntries: SessionEntry[] | null = null;
if (hasTreeEntries) {
try {
branchEntries = SessionManager.open(filePath).getBranch();
} catch {
branchEntries = null;
}
}
if (branchEntries) {
const messages: unknown[] = [];
let messageSeq = 0;
for (const entry of branchEntries) {
if (entry.type === "message" && entry.message) {
messageSeq += 1;
messages.push(
attachOpenClawTranscriptMeta(entry.message, {
...(typeof entry.id === "string" ? { id: entry.id } : {}),
seq: messageSeq,
}),
);
continue;
}
if (entry.type === "compaction") {
const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN;
const timestamp = Number.isFinite(ts) ? ts : Date.now();
messageSeq += 1;
messages.push({
role: "system",
content: [{ type: "text", text: "Compaction" }],
timestamp,
__openclaw: {
kind: "compaction",
id: typeof entry.id === "string" ? entry.id : undefined,
seq: messageSeq,
},
});
}
}
return messages;
}
const messages: unknown[] = [];
let messageSeq = 0;
for (const line of lines) {
if (!line.trim()) {
continue;
}
try {
const parsed = JSON.parse(line);
const message = parsedSessionEntryToMessage(parsed, messageSeq + 1);
if (message) {
messageSeq += 1;
messages.push(message);
}
} catch {
// ignore bad lines
}
}
return messages;
return transcriptRecordsToMessages(readSelectedTranscriptRecords(filePath));
}
type ReadRecentSessionMessagesOptions = {
@@ -283,25 +220,7 @@ export function readRecentSessionMessages(
.filter((line) => line.trim().length > 0)
.slice(-maxLines);
if (lines.some(hasSessionTreeEntry)) {
return readSessionMessages(sessionId, storePath, sessionFile).slice(-maxMessages);
}
const messages: unknown[] = [];
let messageSeq = 0;
for (const line of lines) {
try {
const parsed = JSON.parse(line);
const message = parsedSessionEntryToMessage(parsed, messageSeq + 1);
if (message) {
messageSeq += 1;
messages.push(message);
}
} catch {
// ignore bad tail lines
}
}
return messages.slice(-maxMessages);
return parseRecentTranscriptTailMessages(lines, maxMessages);
}) ?? []
);
}
@@ -401,24 +320,51 @@ function selectBoundedActiveTailRecords(entries: TailTranscriptRecord[]): TailTr
return selected.toReversed();
}
function parseRecentTranscriptTailMessages(lines: string[], maxMessages: number): unknown[] {
const entries = lines.flatMap((line) => {
const entry = parseTailTranscriptRecord(line);
return entry ? [entry] : [];
function readTranscriptRecords(filePath: string): TailTranscriptRecord[] {
const records: TailTranscriptRecord[] = [];
visitTranscriptLines(filePath, (line) => {
if (!line.trim()) {
return;
}
const record = parseTailTranscriptRecord(line);
if (record && record.record.type !== "session") {
records.push(record);
}
});
const selected = entries.some(tailRecordHasTreeLink)
? selectBoundedActiveTailRecords(entries)
: entries;
return records;
}
function selectActiveTranscriptRecords(records: TailTranscriptRecord[]): TailTranscriptRecord[] {
return records.some(tailRecordHasTreeLink) ? selectBoundedActiveTailRecords(records) : records;
}
function readSelectedTranscriptRecords(filePath: string): TailTranscriptRecord[] {
try {
return selectActiveTranscriptRecords(readTranscriptRecords(filePath));
} catch {
return [];
}
}
function transcriptRecordsToMessages(records: TailTranscriptRecord[]): unknown[] {
const messages: unknown[] = [];
let messageSeq = 0;
for (const entry of selected) {
for (const entry of records) {
const message = parsedSessionEntryToMessage(entry.record, messageSeq + 1);
if (message) {
messageSeq += 1;
messages.push(message);
}
}
return messages.slice(-maxMessages);
return messages;
}
function parseRecentTranscriptTailMessages(lines: string[], maxMessages: number): unknown[] {
const entries = lines.flatMap((line) => {
const entry = parseTailTranscriptRecord(line);
return entry ? [entry] : [];
});
return transcriptRecordsToMessages(selectActiveTranscriptRecords(entries)).slice(-maxMessages);
}
function visitTranscriptLines(filePath: string, visit: (line: string) => void): void {
@@ -479,61 +425,6 @@ async function visitTranscriptLinesAsync(
}
}
function transcriptHasTreeEntries(filePath: string): boolean {
let hasTreeEntries = false;
try {
visitTranscriptLines(filePath, (line) => {
if (!hasTreeEntries && hasSessionTreeEntry(line)) {
hasTreeEntries = true;
}
});
} catch {
return false;
}
return hasTreeEntries;
}
function visitSessionManagerBranchMessages(
filePath: string,
visit: (message: unknown, seq: number) => void,
): number {
const branchEntries = SessionManager.open(filePath).getBranch();
let messageSeq = 0;
for (const entry of branchEntries) {
if (entry.type === "message" && entry.message) {
messageSeq += 1;
visit(
attachOpenClawTranscriptMeta(entry.message, {
...(typeof entry.id === "string" ? { id: entry.id } : {}),
seq: messageSeq,
}),
messageSeq,
);
continue;
}
if (entry.type === "compaction") {
const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN;
const timestamp = Number.isFinite(ts) ? ts : Date.now();
messageSeq += 1;
visit(
{
role: "system",
content: [{ type: "text", text: "Compaction" }],
timestamp,
__openclaw: {
kind: "compaction",
id: typeof entry.id === "string" ? entry.id : undefined,
seq: messageSeq,
},
},
messageSeq,
);
}
}
return messageSeq;
}
export function visitSessionMessages(
sessionId: string,
storePath: string | undefined,
@@ -545,35 +436,11 @@ export function visitSessionMessages(
return 0;
}
if (transcriptHasTreeEntries(filePath)) {
try {
return visitSessionManagerBranchMessages(filePath, visit);
} catch {
return 0;
}
const messages = transcriptRecordsToMessages(readSelectedTranscriptRecords(filePath));
for (const [index, message] of messages.entries()) {
visit(message, index + 1);
}
let messageSeq = 0;
try {
visitTranscriptLines(filePath, (line) => {
if (!line.trim()) {
return;
}
try {
const parsed = JSON.parse(line);
const message = parsedSessionEntryToMessage(parsed, messageSeq + 1);
if (message) {
messageSeq += 1;
visit(message, messageSeq);
}
} catch {
// ignore bad lines
}
});
} catch {
return 0;
}
return messageSeq;
return messages.length;
}
export function readSessionMessageCount(
@@ -763,18 +630,6 @@ export function readRecentSessionTranscriptLines(params: {
return { lines, totalLines };
}
function hasSessionTreeEntry(line: string): boolean {
if (!line.trim()) {
return false;
}
try {
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown };
return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed;
} catch {
return false;
}
}
function parsedSessionEntryToMessage(parsed: unknown, seq: number): unknown {
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;