diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 0521b0270f2..17e857ca4a6 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -667,6 +667,51 @@ describe("readSessionMessages", () => { } }); + test("honors byte caps for sync recent tree-message reads", () => { + const sessionId = "test-session-recent-tree-byte-cap"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const hugeContent = "huge ".repeat(4096); + const lines = [ + JSON.stringify({ type: "session", version: 3, id: sessionId }), + JSON.stringify({ + type: "message", + id: "root", + parentId: null, + message: { role: "user", content: "root" }, + }), + JSON.stringify({ + type: "message", + id: "huge", + parentId: "root", + message: { role: "assistant", content: hugeContent }, + }), + JSON.stringify({ + type: "message", + id: "tail", + parentId: "huge", + message: { role: "assistant", content: "tail" }, + }), + ]; + fs.writeFileSync(transcriptPath, `${lines.join("\n")}\n`, "utf-8"); + const readFileSpy = vi.spyOn(fs, "readFileSync"); + const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); + + try { + const out = readRecentSessionMessages(sessionId, storePath, undefined, { + maxMessages: 2, + maxBytes: 2048, + }); + + expect(out).toEqual([expect.objectContaining({ role: "assistant", content: "tail" })]); + expect(JSON.stringify(out)).not.toContain("huge"); + expect(readFileSpy).not.toHaveBeenCalled(); + expect(sessionManagerOpenSpy).not.toHaveBeenCalled(); + } finally { + readFileSpy.mockRestore(); + sessionManagerOpenSpy.mockRestore(); + } + }); + test("counts transcript messages without loading the whole file", () => { const sessionId = "test-session-count-large"; const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); @@ -857,22 +902,28 @@ describe("readSessionMessages", () => { const rawTranscript = fs.readFileSync(sessionFile, "utf-8"); expect(rawTranscript).toContain("original wrapped prompt"); expect(rawTranscript).toContain("clean prompt"); + const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); - const out = readSessionMessages(sessionId, storePath, sessionFile); - expect(out).toHaveLength(2); - expect(out).toEqual([ - expect.objectContaining({ - role: "user", - content: "clean prompt", - __openclaw: expect.objectContaining({ seq: 1 }), - }), - expect.objectContaining({ - role: "assistant", - content: [{ type: "text", text: "clean answer" }], - __openclaw: expect.objectContaining({ seq: 2 }), - }), - ]); - expect(JSON.stringify(out)).not.toContain("original wrapped prompt"); + try { + const out = readSessionMessages(sessionId, storePath, sessionFile); + expect(out).toHaveLength(2); + expect(out).toEqual([ + expect.objectContaining({ + role: "user", + content: "clean prompt", + __openclaw: expect.objectContaining({ seq: 1 }), + }), + expect.objectContaining({ + role: "assistant", + content: [{ type: "text", text: "clean answer" }], + __openclaw: expect.objectContaining({ seq: 2 }), + }), + ]); + expect(JSON.stringify(out)).not.toContain("original wrapped prompt"); + expect(sessionManagerOpenSpy).not.toHaveBeenCalled(); + } finally { + sessionManagerOpenSpy.mockRestore(); + } }); test.each([ diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 37512760b84..7bcb08b1e16 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -1,6 +1,5 @@ import fs from "node:fs"; import { StringDecoder } from "node:string_decoder"; -import { SessionManager, type SessionEntry } from "@mariozechner/pi-coding-agent"; import { deriveSessionTotalTokens, hasNonzeroUsage, normalizeUsage } from "../agents/usage.js"; import { jsonUtf8Bytes } from "../infra/json-utf8-bytes.js"; import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js"; @@ -151,69 +150,7 @@ export function readSessionMessages( return []; } - const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/); - const hasTreeEntries = lines.some(hasSessionTreeEntry); - let branchEntries: SessionEntry[] | null = null; - if (hasTreeEntries) { - try { - branchEntries = SessionManager.open(filePath).getBranch(); - } catch { - branchEntries = null; - } - } - - if (branchEntries) { - const messages: unknown[] = []; - let messageSeq = 0; - for (const entry of branchEntries) { - if (entry.type === "message" && entry.message) { - messageSeq += 1; - messages.push( - attachOpenClawTranscriptMeta(entry.message, { - ...(typeof entry.id === "string" ? { id: entry.id } : {}), - seq: messageSeq, - }), - ); - continue; - } - - if (entry.type === "compaction") { - const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN; - const timestamp = Number.isFinite(ts) ? ts : Date.now(); - messageSeq += 1; - messages.push({ - role: "system", - content: [{ type: "text", text: "Compaction" }], - timestamp, - __openclaw: { - kind: "compaction", - id: typeof entry.id === "string" ? entry.id : undefined, - seq: messageSeq, - }, - }); - } - } - return messages; - } - - const messages: unknown[] = []; - let messageSeq = 0; - for (const line of lines) { - if (!line.trim()) { - continue; - } - try { - const parsed = JSON.parse(line); - const message = parsedSessionEntryToMessage(parsed, messageSeq + 1); - if (message) { - messageSeq += 1; - messages.push(message); - } - } catch { - // ignore bad lines - } - } - return messages; + return transcriptRecordsToMessages(readSelectedTranscriptRecords(filePath)); } type ReadRecentSessionMessagesOptions = { @@ -283,25 +220,7 @@ export function readRecentSessionMessages( .filter((line) => line.trim().length > 0) .slice(-maxLines); - if (lines.some(hasSessionTreeEntry)) { - return readSessionMessages(sessionId, storePath, sessionFile).slice(-maxMessages); - } - - const messages: unknown[] = []; - let messageSeq = 0; - for (const line of lines) { - try { - const parsed = JSON.parse(line); - const message = parsedSessionEntryToMessage(parsed, messageSeq + 1); - if (message) { - messageSeq += 1; - messages.push(message); - } - } catch { - // ignore bad tail lines - } - } - return messages.slice(-maxMessages); + return parseRecentTranscriptTailMessages(lines, maxMessages); }) ?? [] ); } @@ -401,24 +320,51 @@ function selectBoundedActiveTailRecords(entries: TailTranscriptRecord[]): TailTr return selected.toReversed(); } -function parseRecentTranscriptTailMessages(lines: string[], maxMessages: number): unknown[] { - const entries = lines.flatMap((line) => { - const entry = parseTailTranscriptRecord(line); - return entry ? [entry] : []; +function readTranscriptRecords(filePath: string): TailTranscriptRecord[] { + const records: TailTranscriptRecord[] = []; + visitTranscriptLines(filePath, (line) => { + if (!line.trim()) { + return; + } + const record = parseTailTranscriptRecord(line); + if (record && record.record.type !== "session") { + records.push(record); + } }); - const selected = entries.some(tailRecordHasTreeLink) - ? selectBoundedActiveTailRecords(entries) - : entries; + return records; +} + +function selectActiveTranscriptRecords(records: TailTranscriptRecord[]): TailTranscriptRecord[] { + return records.some(tailRecordHasTreeLink) ? selectBoundedActiveTailRecords(records) : records; +} + +function readSelectedTranscriptRecords(filePath: string): TailTranscriptRecord[] { + try { + return selectActiveTranscriptRecords(readTranscriptRecords(filePath)); + } catch { + return []; + } +} + +function transcriptRecordsToMessages(records: TailTranscriptRecord[]): unknown[] { const messages: unknown[] = []; let messageSeq = 0; - for (const entry of selected) { + for (const entry of records) { const message = parsedSessionEntryToMessage(entry.record, messageSeq + 1); if (message) { messageSeq += 1; messages.push(message); } } - return messages.slice(-maxMessages); + return messages; +} + +function parseRecentTranscriptTailMessages(lines: string[], maxMessages: number): unknown[] { + const entries = lines.flatMap((line) => { + const entry = parseTailTranscriptRecord(line); + return entry ? [entry] : []; + }); + return transcriptRecordsToMessages(selectActiveTranscriptRecords(entries)).slice(-maxMessages); } function visitTranscriptLines(filePath: string, visit: (line: string) => void): void { @@ -479,61 +425,6 @@ async function visitTranscriptLinesAsync( } } -function transcriptHasTreeEntries(filePath: string): boolean { - let hasTreeEntries = false; - try { - visitTranscriptLines(filePath, (line) => { - if (!hasTreeEntries && hasSessionTreeEntry(line)) { - hasTreeEntries = true; - } - }); - } catch { - return false; - } - return hasTreeEntries; -} - -function visitSessionManagerBranchMessages( - filePath: string, - visit: (message: unknown, seq: number) => void, -): number { - const branchEntries = SessionManager.open(filePath).getBranch(); - let messageSeq = 0; - for (const entry of branchEntries) { - if (entry.type === "message" && entry.message) { - messageSeq += 1; - visit( - attachOpenClawTranscriptMeta(entry.message, { - ...(typeof entry.id === "string" ? { id: entry.id } : {}), - seq: messageSeq, - }), - messageSeq, - ); - continue; - } - - if (entry.type === "compaction") { - const ts = typeof entry.timestamp === "string" ? Date.parse(entry.timestamp) : Number.NaN; - const timestamp = Number.isFinite(ts) ? ts : Date.now(); - messageSeq += 1; - visit( - { - role: "system", - content: [{ type: "text", text: "Compaction" }], - timestamp, - __openclaw: { - kind: "compaction", - id: typeof entry.id === "string" ? entry.id : undefined, - seq: messageSeq, - }, - }, - messageSeq, - ); - } - } - return messageSeq; -} - export function visitSessionMessages( sessionId: string, storePath: string | undefined, @@ -545,35 +436,11 @@ export function visitSessionMessages( return 0; } - if (transcriptHasTreeEntries(filePath)) { - try { - return visitSessionManagerBranchMessages(filePath, visit); - } catch { - return 0; - } + const messages = transcriptRecordsToMessages(readSelectedTranscriptRecords(filePath)); + for (const [index, message] of messages.entries()) { + visit(message, index + 1); } - - let messageSeq = 0; - try { - visitTranscriptLines(filePath, (line) => { - if (!line.trim()) { - return; - } - try { - const parsed = JSON.parse(line); - const message = parsedSessionEntryToMessage(parsed, messageSeq + 1); - if (message) { - messageSeq += 1; - visit(message, messageSeq); - } - } catch { - // ignore bad lines - } - }); - } catch { - return 0; - } - return messageSeq; + return messages.length; } export function readSessionMessageCount( @@ -763,18 +630,6 @@ export function readRecentSessionTranscriptLines(params: { return { lines, totalLines }; } -function hasSessionTreeEntry(line: string): boolean { - if (!line.trim()) { - return false; - } - try { - const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown }; - return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed; - } catch { - return false; - } -} - function parsedSessionEntryToMessage(parsed: unknown, seq: number): unknown { if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { return null;