From 8cd20e220fbce3c1ed5beb94ece0db9e2bc9104b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 15 Feb 2026 21:53:12 +0000 Subject: [PATCH] refactor(infra): share jsonl transcript reader --- src/infra/session-cost-usage.ts | 83 ++++++++++++++++----------------- 1 file changed, 39 insertions(+), 44 deletions(-) diff --git a/src/infra/session-cost-usage.ts b/src/infra/session-cost-usage.ts index 4dd1203f91e..e4e64a4c6e2 100644 --- a/src/infra/session-cost-usage.ts +++ b/src/infra/session-cost-usage.ts @@ -212,39 +212,52 @@ const applyCostTotal = (totals: CostUsageTotals, costTotal: number | undefined) totals.totalCost += costTotal; }; +async function* readJsonlRecords(filePath: string): AsyncGenerator> { + const fileStream = fs.createReadStream(filePath, { encoding: "utf-8" }); + const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); + try { + for await (const line of rl) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + try { + const parsed = JSON.parse(trimmed) as unknown; + if (!parsed || typeof parsed !== "object") { + continue; + } + yield parsed as Record; + } catch { + // Ignore malformed lines + } + } + } finally { + rl.close(); + fileStream.destroy(); + } +} + async function scanTranscriptFile(params: { filePath: string; config?: OpenClawConfig; onEntry: (entry: ParsedTranscriptEntry) => void; }): Promise { - const fileStream = fs.createReadStream(params.filePath, { encoding: "utf-8" }); - const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); - - for await (const line of rl) { - const trimmed = line.trim(); - if (!trimmed) { + for await (const parsed of readJsonlRecords(params.filePath)) { + const entry = parseTranscriptEntry(parsed); + if (!entry) { continue; } - try { - const parsed = JSON.parse(trimmed) as Record; - const entry = parseTranscriptEntry(parsed); - if (!entry) { - continue; - } - if (entry.usage && entry.costTotal === undefined) { - const cost = resolveModelCostConfig({ - provider: entry.provider, - model: entry.model, - config: params.config, - }); - entry.costTotal = estimateUsageCost({ usage: entry.usage, cost }); - } - - params.onEntry(entry); - } catch { - // Ignore malformed lines + if (entry.usage && entry.costTotal === undefined) { + const cost = resolveModelCostConfig({ + provider: entry.provider, + model: entry.model, + config: params.config, + }); + entry.costTotal = estimateUsageCost({ usage: entry.usage, cost }); } + + params.onEntry(entry); } } @@ -400,16 +413,8 @@ export async function discoverAllSessions(params?: { // Try to read first user message for label extraction let firstUserMessage: string | undefined; try { - const fileStream = fs.createReadStream(filePath, { encoding: "utf-8" }); - const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); - - for await (const line of rl) { - const trimmed = line.trim(); - if (!trimmed) { - continue; - } + for await (const parsed of readJsonlRecords(filePath)) { try { - const parsed = JSON.parse(trimmed) as Record; const message = parsed.message as Record | undefined; if (message?.role === "user") { const content = message.content; @@ -436,8 +441,6 @@ export async function discoverAllSessions(params?: { // Skip malformed lines } } - rl.close(); - fileStream.destroy(); } catch { // Ignore read errors } @@ -831,16 +834,8 @@ export async function loadSessionLogs(params: { const logs: SessionLogEntry[] = []; const limit = params.limit ?? 50; - const fileStream = fs.createReadStream(sessionFile, { encoding: "utf-8" }); - const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); - - for await (const line of rl) { - const trimmed = line.trim(); - if (!trimmed) { - continue; - } + for await (const parsed of readJsonlRecords(sessionFile)) { try { - const parsed = JSON.parse(trimmed) as Record; const message = parsed.message as Record | undefined; if (!message) { continue;