From 4d9c658f4058576e3356d0ee9d95525563f49ec0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 07:36:01 +0100 Subject: [PATCH] perf: bound async transcript history reads (#75977) Summary: - The PR bounds async transcript history reads and shares async transcript-index builds across gateway history, embedded/TUI history, restart recovery, fork token checks, and preflight compaction paths. - Reproducibility: not applicable. this is a performance PR rather than a user bug report. The verification pa ... ource review plus the added unit coverage for bounded reads, usage snapshots, and concurrent index sharing. ClawSweeper fixups: - No separate fixup commits were needed after automerge opt-in. Validation: - ClawSweeper review passed for head ccfe33658c08c21d2a7014ff1cec9fc70f77e05f. - Required merge gates passed before the squash merge. Prepared head SHA: ccfe33658c08c21d2a7014ff1cec9fc70f77e05f Review: https://github.com/openclaw/openclaw/pull/75977#issuecomment-4363170293 Co-authored-by: Peter Steinberger --- CHANGELOG.md | 1 + .../session-management-compaction.md | 6 + src/agents/main-session-restart-recovery.ts | 5 + src/agents/subagent-orphan-recovery.ts | 5 + .../tools/embedded-gateway-stub.test.ts | 22 +- src/agents/tools/embedded-gateway-stub.ts | 19 +- .../reply/agent-runner-memory.test.ts | 162 ++++++++++ src/auto-reply/reply/agent-runner-memory.ts | 99 +++++- .../reply/session-fork.runtime.test.ts | 153 ++++++++- src/auto-reply/reply/session-fork.runtime.ts | 72 ++++- .../gateway-models.profiles.live.test.ts | 5 +- src/gateway/managed-image-attachments.ts | 5 +- src/gateway/server-methods/artifacts.ts | 29 +- src/gateway/session-history-state.ts | 4 + src/gateway/session-reset-service.ts | 5 +- src/gateway/session-transcript-index.fs.ts | 26 +- src/gateway/session-utils.fs.test.ts | 105 ++++++- src/gateway/session-utils.fs.ts | 295 ++++++++++++------ src/gateway/session-utils.ts | 3 + src/gateway/sessions-history-http.ts | 5 +- src/tui/embedded-backend.ts | 10 +- 21 files changed, 890 insertions(+), 146 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48edd940802..02c2e568bfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai - Plugins/ClawHub: preserve official source-linked trust through archive installs, so OpenClaw can install trusted ClawHub plugin packages that trigger the built-in dangerous-pattern scanner. Thanks @vincentkoc. - Plugins/ClawHub: install package runtime dependencies for archive-backed plugin installs, so ClawHub packages such as WhatsApp load declared dependencies after download. Thanks @vincentkoc. - Providers/LM Studio: allow `models.providers.lmstudio.params.preload: false` to skip OpenClaw's native model-load call so LM Studio JIT loading, idle TTL, and auto-evict can own model lifecycle. Fixes #75921. Thanks @garyd9. +- Agents/transcripts: keep chat history, restart recovery, fork token checks, and stale-token compaction checks on bounded async transcript reads or cached async indexes instead of reparsing large session files. Thanks @mariozechner. - Telegram: inherit the process DNS result order for Bot API transport and downgrade recovered sticky IPv4 fallback promotions to debug logs, while keeping pinned-IP escalation warnings visible. Fixes #75904. Thanks @highfly-hi and @neeravmakwana. - Sessions: keep durable external conversation pointers, including group and thread-scoped chat sessions, out of age, count, and disk-budget maintenance eviction while still allowing synthetic runtime entries to age out. Fixes #58088. Thanks @drinkflav. - Web search/MiniMax: allow `MINIMAX_OAUTH_TOKEN` to satisfy MiniMax Search credentials, so OAuth-authorized MiniMax Token Plan setups do not need a separate web-search key. Fixes #65768. Thanks @kikibrian and @zhouhe-xydt. diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index f6f8712392a..2e83120668d 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -54,6 +54,12 @@ OpenClaw persists sessions in two layers: transcript exceeds the checkpoint size cap, avoiding a second giant `.checkpoint.*.jsonl` copy. +Gateway history readers should avoid materializing the whole transcript unless +the surface explicitly needs arbitrary historical access. First-page history, +embedded chat history, restart recovery, and token/usage checks use bounded tail +reads. Full transcript scans go through the async transcript index, which is +cached by file path plus `mtimeMs`/`size` and shared across concurrent readers. + --- ## On-disk locations diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts index 77071703df7..fdc6a8db64d 100644 --- a/src/agents/main-session-restart-recovery.ts +++ b/src/agents/main-session-restart-recovery.ts @@ -230,6 +230,11 @@ async function recoverStore(params: { entry.sessionId, params.storePath, entry.sessionFile, + { + mode: "recent", + maxMessages: 20, + maxBytes: 256 * 1024, + }, ); } catch (err) { log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`); diff --git a/src/agents/subagent-orphan-recovery.ts b/src/agents/subagent-orphan-recovery.ts index 0143175965b..ec4e0337daf 100644 --- a/src/agents/subagent-orphan-recovery.ts +++ b/src/agents/subagent-orphan-recovery.ts @@ -354,6 +354,11 @@ export async function recoverOrphanedSubagentSessions(params: { entry.sessionId, storePath, entry.sessionFile, + { + mode: "recent", + maxMessages: 200, + maxBytes: 1024 * 1024, + }, ); const lastHumanMessage = [...messages] .toReversed() diff --git a/src/agents/tools/embedded-gateway-stub.test.ts b/src/agents/tools/embedded-gateway-stub.test.ts index d6c8451ef61..75f19acaaaf 100644 --- a/src/agents/tools/embedded-gateway-stub.test.ts +++ b/src/agents/tools/embedded-gateway-stub.test.ts @@ -91,10 +91,20 @@ describe("embedded gateway stub", () => { maxChars: 100_000, maxMessages: 200, }); + expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith( + "sess-main", + "/tmp/openclaw-sessions.json", + undefined, + { + mode: "recent", + maxMessages: 200, + maxBytes: 1024 * 1024, + }, + ); expect(result.messages).toEqual(projectedMessages); }); - it("passes the full raw history to projection before limiting visible messages", async () => { + it("passes the requested recent history window to projection", async () => { const rawMessages = [ { role: "user", content: "visible older" }, { role: "assistant", content: "hidden newer" }, @@ -111,5 +121,15 @@ describe("embedded gateway stub", () => { maxChars: 100_000, maxMessages: 1, }); + expect(runtime.readSessionMessagesAsync).toHaveBeenCalledWith( + "sess-main", + "/tmp/openclaw-sessions.json", + undefined, + { + mode: "recent", + maxMessages: 1, + maxBytes: 1024 * 1024, + }, + ); }); }); diff --git a/src/agents/tools/embedded-gateway-stub.ts b/src/agents/tools/embedded-gateway-stub.ts index fff0974fdcd..b33617bcbbf 100644 --- a/src/agents/tools/embedded-gateway-stub.ts +++ b/src/agents/tools/embedded-gateway-stub.ts @@ -1,6 +1,7 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js"; import type { CallGatewayOptions } from "../../gateway/call.js"; import type { SessionsListParams, SessionsResolveParams } from "../../gateway/protocol/index.js"; +import type { ReadSessionMessagesAsyncOptions } from "../../gateway/session-utils.fs.js"; import type { SessionsListResult } from "../../gateway/session-utils.types.js"; import type { SessionsResolveResult } from "../../gateway/sessions-resolve.js"; @@ -52,7 +53,8 @@ interface EmbeddedGatewayRuntime { readSessionMessagesAsync: ( sessionId: string, storePath: string, - sessionFile?: string, + sessionFile: string | undefined, + opts: ReadSessionMessagesAsyncOptions, ) => Promise; resolveSessionModelRef: ( cfg: OpenClawConfig, @@ -112,6 +114,11 @@ async function handleChatHistory(params: Record): Promise<{ const sessionId = entry?.sessionId as string | undefined; const sessionAgentId = rt.resolveSessionAgentId({ sessionKey, config: cfg }); const resolvedSessionModel = rt.resolveSessionModelRef(cfg, entry, sessionAgentId); + const hardMax = 1000; + const defaultLimit = 200; + const requested = typeof limit === "number" ? limit : defaultLimit; + const max = Math.min(hardMax, requested); + const maxHistoryBytes = rt.getMaxChatHistoryMessagesBytes(); const localMessages = sessionId && storePath @@ -119,6 +126,11 @@ async function handleChatHistory(params: Record): Promise<{ sessionId, storePath, entry?.sessionFile as string | undefined, + { + mode: "recent", + maxMessages: max, + maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024), + }, ) : []; @@ -128,10 +140,6 @@ async function handleChatHistory(params: Record): Promise<{ localMessages, }); - const hardMax = 1000; - const defaultLimit = 200; - const requested = typeof limit === "number" ? limit : defaultLimit; - const max = Math.min(hardMax, requested); const effectiveMaxChars = rt.resolveEffectiveChatHistoryMaxChars(cfg); const normalized = rt.augmentChatHistoryWithCanvasBlocks( @@ -141,7 +149,6 @@ async function handleChatHistory(params: Record): Promise<{ }), ); - const maxHistoryBytes = rt.getMaxChatHistoryMessagesBytes(); const perMessageHardCap = Math.min(rt.CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes); const replaced = rt.replaceOversizedChatHistoryMessages({ messages: normalized, diff --git a/src/auto-reply/reply/agent-runner-memory.test.ts b/src/auto-reply/reply/agent-runner-memory.test.ts index 04f35fcc42b..fd90f501d68 100644 --- a/src/auto-reply/reply/agent-runner-memory.test.ts +++ b/src/auto-reply/reply/agent-runner-memory.test.ts @@ -429,6 +429,168 @@ describe("runMemoryFlushIfNeeded", () => { }); }); + it("includes recent output tokens when deciding preflight compaction", async () => { + const sessionFile = path.join(rootDir, "session-usage.jsonl"); + await fs.writeFile( + sessionFile, + `${JSON.stringify({ + message: { + role: "assistant", + content: "large answer", + usage: { input: 90_000, output: 10_000 }, + }, + })}\n`, + "utf8", + ); + registerMemoryFlushPlanResolver(() => ({ + softThresholdTokens: 4_000, + forceFlushTranscriptBytes: 1_000_000_000, + reserveTokensFloor: 0, + prompt: "Pre-compaction memory flush.\nNO_REPLY", + systemPrompt: "Write memory to memory/YYYY-MM-DD.md.", + relativePath: "memory/2023-11-14.md", + })); + const sessionEntry: SessionEntry = { + sessionId: "session", + sessionFile, + updatedAt: Date.now(), + totalTokensFresh: false, + }; + + await runPreflightCompactionIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun({ + sessionId: "session", + sessionFile, + sessionKey: "main", + }), + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 100_000, + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath: path.join(rootDir, "sessions.json"), + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + const compactCall = compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as { + currentTokenCount?: number; + }; + expect(compactCall.currentTokenCount).toBeGreaterThanOrEqual(100_000); + }); + + it("uses the active run sessionFile when the session entry has no transcript path", async () => { + const sessionFile = path.join(rootDir, "active-run-session.jsonl"); + await fs.writeFile( + sessionFile, + `${JSON.stringify({ + message: { + role: "assistant", + content: "large answer", + usage: { input: 90_000, output: 8_000 }, + }, + })}\n`, + "utf8", + ); + registerMemoryFlushPlanResolver(() => ({ + softThresholdTokens: 4_000, + forceFlushTranscriptBytes: 1_000_000_000, + reserveTokensFloor: 0, + prompt: "Pre-compaction memory flush.\nNO_REPLY", + systemPrompt: "Write memory to memory/YYYY-MM-DD.md.", + relativePath: "memory/2023-11-14.md", + })); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokensFresh: false, + }; + + await runPreflightCompactionIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun({ + sessionId: "session", + sessionFile, + sessionKey: "main", + }), + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 100_000, + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath: path.join(rootDir, "sessions.json"), + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + expect(compactEmbeddedPiSessionMock).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "session", + sessionFile: expect.stringContaining("active-run-session.jsonl"), + }), + ); + }); + + it("keeps preflight compaction conservative for content appended after latest usage", async () => { + const sessionFile = path.join(rootDir, "post-usage-tail-session.jsonl"); + await fs.writeFile( + sessionFile, + [ + JSON.stringify({ + message: { + role: "assistant", + content: "small answer", + usage: { input: 40_000, output: 2_000 }, + }, + }), + JSON.stringify({ + message: { + role: "tool", + content: `large interrupted tool output ${"x".repeat(450_000)}`, + }, + }), + ].join("\n"), + "utf8", + ); + registerMemoryFlushPlanResolver(() => ({ + softThresholdTokens: 4_000, + forceFlushTranscriptBytes: 1_000_000_000, + reserveTokensFloor: 0, + prompt: "Pre-compaction memory flush.\nNO_REPLY", + systemPrompt: "Write memory to memory/YYYY-MM-DD.md.", + relativePath: "memory/2023-11-14.md", + })); + const sessionEntry: SessionEntry = { + sessionId: "session", + sessionFile, + updatedAt: Date.now(), + totalTokensFresh: false, + }; + + await runPreflightCompactionIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun({ + sessionId: "session", + sessionFile, + sessionKey: "main", + }), + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 100_000, + sessionEntry, + sessionStore: { main: sessionEntry }, + sessionKey: "main", + storePath: path.join(rootDir, "sessions.json"), + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + const compactCall = compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as { + currentTokenCount?: number; + }; + expect(compactCall.currentTokenCount).toBeGreaterThan(100_000); + }); + it("triggers preflight compaction when the active transcript exceeds the configured byte threshold", async () => { const sessionFile = path.join(rootDir, "large-session.jsonl"); await fs.writeFile( diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 23682b710d2..52950259327 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -165,6 +165,7 @@ export type SessionTranscriptUsageSnapshot = { // transcript reads in time to flip memory-flush gating when needed. const TRANSCRIPT_OUTPUT_READ_BUFFER_TOKENS = 8192; const TRANSCRIPT_TAIL_CHUNK_BYTES = 64 * 1024; +const FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN = 4; function parseUsageFromTranscriptLine(line: string): ReturnType | undefined { const trimmed = line.trim(); @@ -341,20 +342,64 @@ async function readLastNonzeroUsageFromSessionLog(logPath: string) { } } +type TranscriptTokenEstimate = { + promptTokens: number; + outputTokens?: number; + transcriptBytesTokens?: number; +}; + async function estimatePromptTokensFromSessionTranscript(params: { sessionId?: string; - storePath?: string; + sessionEntry?: SessionEntry; + sessionKey?: string; sessionFile?: string; -}): Promise { + storePath?: string; +}): Promise { const sessionId = normalizeOptionalString(params.sessionId); if (!sessionId) { return undefined; } + const fallbackSessionFile = normalizeOptionalString(params.sessionFile); + const sessionEntryForTranscript = + params.sessionEntry?.sessionFile || !fallbackSessionFile + ? params.sessionEntry + : ({ ...params.sessionEntry, sessionFile: fallbackSessionFile } as SessionEntry); try { + const snapshot = await readSessionLogSnapshot({ + sessionId, + sessionEntry: sessionEntryForTranscript, + sessionKey: params.sessionKey, + opts: { storePath: params.storePath }, + includeByteSize: true, + includeUsage: true, + }); + const transcriptBytesTokens = + typeof snapshot.byteSize === "number" && + Number.isFinite(snapshot.byteSize) && + snapshot.byteSize > 0 + ? Math.ceil(snapshot.byteSize / FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN) + : undefined; + const promptTokens = snapshot.usage?.promptTokens; + if (typeof promptTokens === "number" && Number.isFinite(promptTokens) && promptTokens > 0) { + const outputTokens = snapshot.usage?.outputTokens; + return { + promptTokens: Math.ceil(promptTokens), + outputTokens: + typeof outputTokens === "number" && Number.isFinite(outputTokens) && outputTokens > 0 + ? Math.ceil(outputTokens) + : undefined, + transcriptBytesTokens, + }; + } const messages = (await readSessionMessagesAsync( sessionId, params.storePath, - params.sessionFile, + sessionEntryForTranscript?.sessionFile, + { + mode: "recent", + maxMessages: 200, + maxBytes: 1024 * 1024, + }, )) as AgentMessage[]; if (messages.length === 0) { return undefined; @@ -363,7 +408,10 @@ async function estimatePromptTokensFromSessionTranscript(params: { if (!Number.isFinite(estimatedTokens) || estimatedTokens <= 0) { return undefined; } - return Math.ceil(estimatedTokens); + return { + promptTokens: Math.ceil(estimatedTokens), + transcriptBytesTokens, + }; } catch { return undefined; } @@ -422,7 +470,10 @@ export async function runPreflightCompactionIfNeeded(params: { const transcriptSizeSnapshot = shouldCheckActiveTranscriptBytes ? await readSessionLogSnapshot({ sessionId: entry.sessionId, - sessionEntry: entry, + sessionEntry: + entry.sessionFile || !params.followupRun.run.sessionFile + ? entry + : { ...entry, sessionFile: params.followupRun.run.sessionFile }, sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey, opts: { storePath: params.storePath }, includeByteSize: true, @@ -441,22 +492,44 @@ export async function runPreflightCompactionIfNeeded(params: { const promptTokenEstimate = estimatePromptTokensForMemoryFlush( params.promptForEstimate ?? params.followupRun.prompt, ); - const transcriptPromptTokens = + const transcriptUsageTokens = typeof freshPersistedTokens === "number" ? undefined : await estimatePromptTokensFromSessionTranscript({ sessionId: entry.sessionId, - storePath: params.storePath, + sessionEntry: entry, + sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey, sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile, + storePath: params.storePath, }); - const projectedTokenCount = - typeof transcriptPromptTokens === "number" - ? resolveEffectivePromptTokens(transcriptPromptTokens, undefined, promptTokenEstimate) + const stalePersistedPromptTokens = hasPersistedTotalTokens + ? Math.floor(persistedTotalTokens) + : undefined; + const transcriptPromptTokens = transcriptUsageTokens?.promptTokens; + const transcriptOutputTokens = transcriptUsageTokens?.outputTokens; + const transcriptBytesProjectedTokens = + typeof transcriptUsageTokens?.transcriptBytesTokens === "number" + ? resolveEffectivePromptTokens( + transcriptUsageTokens.transcriptBytesTokens, + undefined, + promptTokenEstimate, + ) : undefined; + const usageProjectedTokenCount = + typeof transcriptPromptTokens === "number" + ? resolveEffectivePromptTokens( + transcriptPromptTokens, + transcriptOutputTokens, + promptTokenEstimate, + ) + : undefined; + const projectedTokenCount = Math.max( + usageProjectedTokenCount ?? 0, + transcriptBytesProjectedTokens ?? 0, + stalePersistedPromptTokens ?? 0, + ); const tokenCountForCompaction = - typeof projectedTokenCount === "number" && - Number.isFinite(projectedTokenCount) && - projectedTokenCount > 0 + Number.isFinite(projectedTokenCount) && projectedTokenCount > 0 ? projectedTokenCount : undefined; diff --git a/src/auto-reply/reply/session-fork.runtime.test.ts b/src/auto-reply/reply/session-fork.runtime.test.ts index a1254dda20b..e67a38a5ca5 100644 --- a/src/auto-reply/reply/session-fork.runtime.test.ts +++ b/src/auto-reply/reply/session-fork.runtime.test.ts @@ -21,7 +21,7 @@ afterEach(async () => { }); describe("resolveParentForkTokenCountRuntime", () => { - it("falls back to transcript-estimated tokens when cached totals are stale", async () => { + it("falls back to recent transcript usage when cached totals are stale", async () => { const root = await makeRoot("openclaw-parent-fork-token-estimate-"); const sessionsDir = path.join(root, "sessions"); await fs.mkdir(sessionsDir); @@ -38,7 +38,7 @@ describe("resolveParentForkTokenCountRuntime", () => { }), ]; for (let index = 0; index < 40; index += 1) { - const body = `turn-${index} ${"x".repeat(12_000)}`; + const body = `turn-${index} ${"x".repeat(200)}`; lines.push( JSON.stringify({ type: "message", @@ -52,7 +52,11 @@ describe("resolveParentForkTokenCountRuntime", () => { id: `a${index}`, parentId: `u${index}`, timestamp: new Date().toISOString(), - message: { role: "assistant", content: body }, + message: { + role: "assistant", + content: body, + usage: index === 39 ? { input: 90_000, output: 20_000 } : undefined, + }, }), ); } @@ -71,6 +75,149 @@ describe("resolveParentForkTokenCountRuntime", () => { storePath: path.join(root, "sessions.json"), }); + expect(tokens).toBe(110_000); + }); + + it("falls back to a conservative byte estimate when stale parent transcript has no usage", async () => { + const root = await makeRoot("openclaw-parent-fork-byte-estimate-"); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir); + + const sessionId = "parent-no-usage-transcript"; + const sessionFile = path.join(sessionsDir, "parent.jsonl"); + const lines = [ + JSON.stringify({ + type: "session", + version: 3, + id: sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + ]; + for (let index = 0; index < 24; index += 1) { + lines.push( + JSON.stringify({ + type: "message", + id: `u${index}`, + parentId: index === 0 ? null : `a${index - 1}`, + timestamp: new Date().toISOString(), + message: { role: "user", content: `turn-${index} ${"x".repeat(24_000)}` }, + }), + ); + } + await fs.writeFile(sessionFile, `${lines.join("\n")}\n`, "utf-8"); + + const entry: SessionEntry = { + sessionId, + sessionFile, + updatedAt: Date.now(), + totalTokensFresh: false, + }; + + const tokens = await resolveParentForkTokenCountRuntime({ + parentEntry: entry, + storePath: path.join(root, "sessions.json"), + }); + + expect(tokens).toBeGreaterThan(100_000); + }); + + it("uses the latest usage snapshot instead of tail aggregates for parent fork checks", async () => { + const root = await makeRoot("openclaw-parent-fork-latest-usage-"); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir); + + const sessionId = "parent-multiple-usage-transcript"; + const sessionFile = path.join(sessionsDir, "parent.jsonl"); + await fs.writeFile( + sessionFile, + [ + JSON.stringify({ + type: "session", + version: 3, + id: sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + JSON.stringify({ + message: { + role: "assistant", + content: "older", + usage: { input: 60_000, output: 5_000 }, + }, + }), + JSON.stringify({ + message: { + role: "assistant", + content: "latest", + usage: { input: 70_000, output: 8_000 }, + }, + }), + ].join("\n"), + "utf-8", + ); + + const entry: SessionEntry = { + sessionId, + sessionFile, + updatedAt: Date.now(), + totalTokensFresh: false, + }; + + const tokens = await resolveParentForkTokenCountRuntime({ + parentEntry: entry, + storePath: path.join(root, "sessions.json"), + }); + + expect(tokens).toBe(78_000); + }); + + it("keeps parent fork checks conservative for content appended after latest usage", async () => { + const root = await makeRoot("openclaw-parent-fork-post-usage-tail-"); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir); + + const sessionId = "parent-post-usage-tail"; + const sessionFile = path.join(sessionsDir, "parent.jsonl"); + await fs.writeFile( + sessionFile, + [ + JSON.stringify({ + type: "session", + version: 3, + id: sessionId, + timestamp: new Date().toISOString(), + cwd: process.cwd(), + }), + JSON.stringify({ + message: { + role: "assistant", + content: "latest model call", + usage: { input: 40_000, output: 2_000 }, + }, + }), + JSON.stringify({ + message: { + role: "tool", + content: `large appended tool result ${"x".repeat(450_000)}`, + }, + }), + ].join("\n"), + "utf-8", + ); + + const entry: SessionEntry = { + sessionId, + sessionFile, + updatedAt: Date.now(), + totalTokensFresh: false, + }; + + const tokens = await resolveParentForkTokenCountRuntime({ + parentEntry: entry, + storePath: path.join(root, "sessions.json"), + }); + expect(tokens).toBeGreaterThan(100_000); }); }); diff --git a/src/auto-reply/reply/session-fork.runtime.ts b/src/auto-reply/reply/session-fork.runtime.ts index 6d9dba8989d..91a649dd1e8 100644 --- a/src/auto-reply/reply/session-fork.runtime.ts +++ b/src/auto-reply/reply/session-fork.runtime.ts @@ -1,7 +1,6 @@ import crypto from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; -import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { CURRENT_SESSION_VERSION, migrateSessionEntries, @@ -10,13 +9,16 @@ import { type SessionEntry as PiSessionEntry, type SessionHeader, } from "@mariozechner/pi-coding-agent"; -import { estimateMessagesTokens } from "../../agents/compaction.js"; -import { resolveSessionFilePath } from "../../config/sessions/paths.js"; +import { derivePromptTokens } from "../../agents/usage.js"; +import { + resolveSessionFilePath, + resolveSessionFilePathOptions, +} from "../../config/sessions/paths.js"; import { resolveFreshSessionTotalTokens, type SessionEntry as StoreSessionEntry, } from "../../config/sessions/types.js"; -import { readSessionMessagesAsync } from "../../gateway/session-utils.fs.js"; +import { readLatestRecentSessionUsageFromTranscriptAsync } from "../../gateway/session-utils.fs.js"; type ForkSourceTranscript = { cwd: string; @@ -26,12 +28,42 @@ type ForkSourceTranscript = { labelsToWrite: Array<{ targetId: string; label: string; timestamp: string }>; }; +const FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN = 4; + function resolvePositiveTokenCount(value: number | undefined): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 ? Math.floor(value) : undefined; } +function maxPositiveTokenCount(...values: Array): number | undefined { + let max: number | undefined; + for (const value of values) { + const normalized = resolvePositiveTokenCount(value); + if (typeof normalized === "number" && (max === undefined || normalized > max)) { + max = normalized; + } + } + return max; +} + +async function estimateParentTranscriptTokensFromBytes(params: { + parentEntry: StoreSessionEntry; + storePath: string; +}): Promise { + try { + const filePath = resolveSessionFilePath( + params.parentEntry.sessionId, + params.parentEntry, + resolveSessionFilePathOptions({ storePath: params.storePath }), + ); + const stat = await fs.stat(filePath); + return resolvePositiveTokenCount(Math.ceil(stat.size / FALLBACK_TRANSCRIPT_BYTES_PER_TOKEN)); + } catch { + return undefined; + } +} + export async function resolveParentForkTokenCountRuntime(params: { parentEntry: StoreSessionEntry; storePath: string; @@ -41,26 +73,36 @@ export async function resolveParentForkTokenCountRuntime(params: { return freshPersistedTokens; } + const cachedTokens = resolvePositiveTokenCount(params.parentEntry.totalTokens); + const byteEstimateTokens = await estimateParentTranscriptTokensFromBytes(params); try { - const transcriptMessages = (await readSessionMessagesAsync( + const usage = await readLatestRecentSessionUsageFromTranscriptAsync( params.parentEntry.sessionId, params.storePath, params.parentEntry.sessionFile, - )) as AgentMessage[]; - if (transcriptMessages.length > 0) { - const estimatedTokens = estimateMessagesTokens(transcriptMessages); - const transcriptTokens = resolvePositiveTokenCount( - Number.isFinite(estimatedTokens) ? Math.ceil(estimatedTokens) : undefined, + undefined, + 1024 * 1024, + ); + const promptTokens = resolvePositiveTokenCount( + derivePromptTokens({ + input: usage?.inputTokens, + cacheRead: usage?.cacheRead, + cacheWrite: usage?.cacheWrite, + }), + ); + const outputTokens = resolvePositiveTokenCount(usage?.outputTokens); + if (typeof promptTokens === "number") { + return maxPositiveTokenCount( + promptTokens + (outputTokens ?? 0), + cachedTokens, + byteEstimateTokens, ); - if (typeof transcriptTokens === "number") { - return transcriptTokens; - } } } catch { - // Fall back to cached totals when the parent transcript cannot be read. + // Fall back to cached totals when recent transcript usage cannot be read. } - return resolvePositiveTokenCount(params.parentEntry.totalTokens); + return maxPositiveTokenCount(cachedTokens, byteEstimateTokens); } function isSessionEntry(entry: FileEntry): entry is PiSessionEntry { diff --git a/src/gateway/gateway-models.profiles.live.test.ts b/src/gateway/gateway-models.profiles.live.test.ts index 000ab1b63ab..253ce4b2271 100644 --- a/src/gateway/gateway-models.profiles.live.test.ts +++ b/src/gateway/gateway-models.profiles.live.test.ts @@ -1163,7 +1163,10 @@ async function readSessionAssistantTexts(sessionKey: string, modelKey?: string): if (!entry?.sessionId) { return []; } - const messages = await readSessionMessagesAsync(entry.sessionId, storePath, entry.sessionFile); + const messages = await readSessionMessagesAsync(entry.sessionId, storePath, entry.sessionFile, { + mode: "full", + reason: "live model assistant text verification", + }); const assistantTexts: string[] = []; for (const message of messages) { if (!message || typeof message !== "object") { diff --git a/src/gateway/managed-image-attachments.ts b/src/gateway/managed-image-attachments.ts index dd05be7a1bb..248f366ec7d 100644 --- a/src/gateway/managed-image-attachments.ts +++ b/src/gateway/managed-image-attachments.ts @@ -717,7 +717,10 @@ async function getSessionManagedOutgoingAttachmentIndex( } } - const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile); + const messages = await readSessionMessagesAsync(sessionId, storePath, entry.sessionFile, { + mode: "full", + reason: "managed outgoing attachment index", + }); const index: SessionManagedOutgoingAttachmentIndex = new Set(); for (const message of messages) { const meta = (message as { __openclaw?: { id?: string } } | null)?.__openclaw; diff --git a/src/gateway/server-methods/artifacts.ts b/src/gateway/server-methods/artifacts.ts index bdace8f9366..6d82d4c7e72 100644 --- a/src/gateway/server-methods/artifacts.ts +++ b/src/gateway/server-methods/artifacts.ts @@ -313,16 +313,25 @@ async function loadArtifacts( return { sessionKey, artifacts: [] }; } const artifacts: ArtifactRecord[] = []; - await visitSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, (message, seq) => { - collectArtifactsFromMessage({ - message, - messageFallbackSeq: seq, - artifacts, - sessionKey, - runId: query.runId, - taskId: query.taskId, - }); - }); + await visitSessionMessagesAsync( + sessionId, + storePath, + entry?.sessionFile, + (message, seq) => { + collectArtifactsFromMessage({ + message, + messageFallbackSeq: seq, + artifacts, + sessionKey, + runId: query.runId, + taskId: query.taskId, + }); + }, + { + mode: "full", + reason: "artifact query transcript scan", + }, + ); return { sessionKey, artifacts, diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts index 7c70148a466..5673bcb9ad7 100644 --- a/src/gateway/session-history-state.ts +++ b/src/gateway/session-history-state.ts @@ -288,6 +288,10 @@ export class SessionHistorySseState { this.target.sessionId, this.target.storePath, this.target.sessionFile, + { + mode: "full", + reason: "session history cursor pagination", + }, ), }; } diff --git a/src/gateway/session-reset-service.ts b/src/gateway/session-reset-service.ts index 60153f593a6..12933483e98 100644 --- a/src/gateway/session-reset-service.ts +++ b/src/gateway/session-reset-service.ts @@ -454,7 +454,10 @@ async function emitGatewayBeforeResetPluginHook(params: { let messages: unknown[] = []; try { if (typeof sessionId === "string" && sessionId.trim().length > 0) { - messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile); + messages = await readSessionMessagesAsync(sessionId, params.storePath, sessionFile, { + mode: "full", + reason: "before_reset hook payload", + }); } } catch (err) { logVerbose( diff --git a/src/gateway/session-transcript-index.fs.ts b/src/gateway/session-transcript-index.fs.ts index 38a5228e2e9..4578bea84f7 100644 --- a/src/gateway/session-transcript-index.fs.ts +++ b/src/gateway/session-transcript-index.fs.ts @@ -38,6 +38,14 @@ type CacheEntry = { }; const transcriptIndexCache = new Map(); +const transcriptIndexBuilds = new Map< + string, + { + mtimeMs: number; + size: number; + promise: Promise; + } +>(); function normalizeOptionalString(value: unknown): string | undefined { return typeof value === "string" && value.trim().length > 0 ? value : undefined; @@ -66,6 +74,7 @@ function setCachedIndex(filePath: string, entry: CacheEntry): void { export function clearSessionTranscriptIndexCache(): void { transcriptIndexCache.clear(); + transcriptIndexBuilds.clear(); } function isIndexableTranscriptRecord(record: unknown): record is ParsedTranscriptRecord { @@ -233,7 +242,22 @@ export async function readSessionTranscriptIndex( if (cached && cached.mtimeMs === stat.mtimeMs && cached.size === stat.size) { return touchCachedIndex(filePath, cached); } - const index = await buildSessionTranscriptIndex(filePath, stat); + const inFlight = transcriptIndexBuilds.get(filePath); + if (inFlight && inFlight.mtimeMs === stat.mtimeMs && inFlight.size === stat.size) { + return await inFlight.promise; + } + const promise = buildSessionTranscriptIndex(filePath, stat); + transcriptIndexBuilds.set(filePath, { + mtimeMs: stat.mtimeMs, + size: stat.size, + promise, + }); + const index = await promise.finally(() => { + const current = transcriptIndexBuilds.get(filePath); + if (current?.promise === promise) { + transcriptIndexBuilds.delete(filePath); + } + }); setCachedIndex(filePath, { mtimeMs: stat.mtimeMs, size: stat.size, diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 17e857ca4a6..cf0a6d4caaa 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -11,6 +11,8 @@ import { readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscript, readLatestSessionUsageFromTranscriptAsync, + readLatestRecentSessionUsageFromTranscriptAsync, + readRecentSessionUsageFromTranscriptAsync, readRecentSessionUsageFromTranscript, readRecentSessionMessagesAsync, readRecentSessionMessages, @@ -786,7 +788,10 @@ describe("readSessionMessages", () => { const readFileSpy = vi.spyOn(fs, "readFileSync"); try { - const messages = await readSessionMessagesAsync(sessionId, storePath); + const messages = await readSessionMessagesAsync(sessionId, storePath, undefined, { + mode: "full", + reason: "test active branch selection", + }); expect(messages.map((message) => (message as { content?: unknown }).content)).toEqual([ "root", "active branch", @@ -822,6 +827,104 @@ describe("readSessionMessages", () => { } }); + test("shares concurrent async transcript index builds", async () => { + const sessionId = "test-session-index-cache-concurrent"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "user", content: "hello" } }, + { message: { role: "assistant", content: "hi" } }, + ]); + clearSessionTranscriptIndexCache(); + + const openSpy = vi.spyOn(fs.promises, "open"); + try { + await expect( + Promise.all( + Array.from({ length: 8 }, () => readSessionMessageCountAsync(sessionId, storePath)), + ), + ).resolves.toEqual(Array.from({ length: 8 }, () => 2)); + expect(openSpy).toHaveBeenCalledTimes(1); + } finally { + openSpy.mockRestore(); + } + }); + + test("readSessionMessagesAsync recent mode honors byte caps", async () => { + const sessionId = "test-session-async-recent-mode"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "user", content: "older" } }, + { message: { role: "assistant", content: "x".repeat(32 * 1024) } }, + { message: { role: "user", content: "latest" } }, + ]); + clearSessionTranscriptIndexCache(); + const openSpy = vi.spyOn(fs.promises, "open"); + + try { + const messages = await readSessionMessagesAsync(sessionId, storePath, undefined, { + mode: "recent", + maxMessages: 1, + maxBytes: 2048, + }); + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ role: "user", content: "latest" }); + expect(JSON.stringify(messages)).not.toContain("older"); + expect(openSpy).toHaveBeenCalledTimes(1); + } finally { + openSpy.mockRestore(); + } + }); + + test("reads recent session usage asynchronously from the transcript tail", async () => { + const sessionId = "test-session-async-recent-usage"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "assistant", content: "older", usage: { input: 10, output: 1 } } }, + { message: { role: "assistant", content: "x".repeat(32 * 1024) } }, + { message: { role: "assistant", content: "latest", usage: { input: 42, output: 7 } } }, + ]); + + const usage = await readRecentSessionUsageFromTranscriptAsync( + sessionId, + storePath, + undefined, + undefined, + 2048, + ); + + expect(usage).toMatchObject({ + inputTokens: 42, + outputTokens: 7, + }); + }); + + test("reads latest recent session usage separately from tail aggregates", async () => { + const sessionId = "test-session-async-latest-recent-usage"; + writeTranscript(tmpDir, sessionId, [ + { type: "session", version: 1, id: sessionId }, + { message: { role: "assistant", content: "older", usage: { input: 50, output: 5 } } }, + { message: { role: "assistant", content: "latest", usage: { input: 70, output: 9 } } }, + ]); + + const aggregate = await readRecentSessionUsageFromTranscriptAsync( + sessionId, + storePath, + undefined, + undefined, + 2048, + ); + const latest = await readLatestRecentSessionUsageFromTranscriptAsync( + sessionId, + storePath, + undefined, + undefined, + 2048, + ); + + expect(aggregate).toMatchObject({ inputTokens: 120, outputTokens: 14 }); + expect(latest).toMatchObject({ inputTokens: 70, outputTokens: 9 }); + }); + test("tails transcript lines for manual compaction without loading the whole file", () => { const sessionId = "test-session-line-tail"; const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index 7bcb08b1e16..b4ca9eb2e07 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -153,12 +153,21 @@ export function readSessionMessages( return transcriptRecordsToMessages(readSelectedTranscriptRecords(filePath)); } -type ReadRecentSessionMessagesOptions = { +export type ReadRecentSessionMessagesOptions = { maxMessages: number; maxBytes?: number; maxLines?: number; }; +export type ReadSessionMessagesAsyncOptions = + | { + mode: "full"; + reason: string; + } + | ({ + mode: "recent"; + } & ReadRecentSessionMessagesOptions); + type ReadRecentSessionMessagesResult = { messages: unknown[]; totalMessages: number; @@ -472,8 +481,13 @@ export function readSessionMessageCount( export async function readSessionMessagesAsync( sessionId: string, storePath: string | undefined, - sessionFile?: string, + sessionFile: string | undefined, + opts: ReadSessionMessagesAsyncOptions, ): Promise { + if (opts.mode === "recent") { + const { mode: _mode, ...recentOpts } = opts; + return await readRecentSessionMessagesAsync(sessionId, storePath, sessionFile, recentOpts); + } const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); if (!filePath) { return []; @@ -487,6 +501,7 @@ export async function visitSessionMessagesAsync( storePath: string | undefined, sessionFile: string | undefined, visit: (message: unknown, seq: number) => void, + _opts: { mode: "full"; reason: string }, ): Promise { const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile); if (!filePath) { @@ -1027,7 +1042,91 @@ function resolvePositiveUsageNumber(value: unknown): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : undefined; } -function extractLatestUsageFromTranscriptLines( +function extractUsageSnapshotFromTranscriptLine( + line: string, +): SessionTranscriptUsageSnapshot | null { + try { + const parsed = JSON.parse(line) as Record; + const message = + parsed.message && typeof parsed.message === "object" && !Array.isArray(parsed.message) + ? (parsed.message as Record) + : undefined; + if (!message) { + return null; + } + const role = typeof message.role === "string" ? message.role : undefined; + if (role && role !== "assistant") { + return null; + } + const usageRaw = + message.usage && typeof message.usage === "object" && !Array.isArray(message.usage) + ? message.usage + : parsed.usage && typeof parsed.usage === "object" && !Array.isArray(parsed.usage) + ? parsed.usage + : undefined; + const usage = normalizeUsage(usageRaw); + const totalTokens = resolvePositiveUsageNumber(deriveSessionTotalTokens({ usage })); + const costUsd = extractTranscriptUsageCost(usageRaw); + const modelProvider = + typeof message.provider === "string" + ? message.provider.trim() + : typeof parsed.provider === "string" + ? parsed.provider.trim() + : undefined; + const model = + typeof message.model === "string" + ? message.model.trim() + : typeof parsed.model === "string" + ? parsed.model.trim() + : undefined; + const isDeliveryMirror = modelProvider === "openclaw" && model === "delivery-mirror"; + const hasMeaningfulUsage = + hasNonzeroUsage(usage) || + typeof totalTokens === "number" || + (typeof costUsd === "number" && Number.isFinite(costUsd)); + const hasModelIdentity = Boolean(modelProvider || model); + if (!hasMeaningfulUsage && !hasModelIdentity) { + return null; + } + if (isDeliveryMirror && !hasMeaningfulUsage) { + return null; + } + + const snapshot: SessionTranscriptUsageSnapshot = {}; + if (!isDeliveryMirror) { + if (modelProvider) { + snapshot.modelProvider = modelProvider; + } + if (model) { + snapshot.model = model; + } + } + if (typeof usage?.input === "number" && Number.isFinite(usage.input)) { + snapshot.inputTokens = usage.input; + } + if (typeof usage?.output === "number" && Number.isFinite(usage.output)) { + snapshot.outputTokens = usage.output; + } + if (typeof usage?.cacheRead === "number" && Number.isFinite(usage.cacheRead)) { + snapshot.cacheRead = usage.cacheRead; + } + if (typeof usage?.cacheWrite === "number" && Number.isFinite(usage.cacheWrite)) { + snapshot.cacheWrite = usage.cacheWrite; + } + if (typeof totalTokens === "number") { + snapshot.totalTokens = totalTokens; + snapshot.totalTokensFresh = true; + } + if (typeof costUsd === "number" && Number.isFinite(costUsd)) { + snapshot.costUsd = costUsd; + } + return snapshot; + } catch { + return null; + } +} + +function extractAggregateUsageFromTranscriptLines( lines: Iterable, ): SessionTranscriptUsageSnapshot | null { const snapshot: SessionTranscriptUsageSnapshot = {}; @@ -1044,88 +1143,40 @@ function extractLatestUsageFromTranscriptLines( let sawCost = false; for (const line of lines) { - try { - const parsed = JSON.parse(line) as Record; - const message = - parsed.message && typeof parsed.message === "object" && !Array.isArray(parsed.message) - ? (parsed.message as Record) - : undefined; - if (!message) { - continue; - } - const role = typeof message.role === "string" ? message.role : undefined; - if (role && role !== "assistant") { - continue; - } - const usageRaw = - message.usage && typeof message.usage === "object" && !Array.isArray(message.usage) - ? message.usage - : parsed.usage && typeof parsed.usage === "object" && !Array.isArray(parsed.usage) - ? parsed.usage - : undefined; - const usage = normalizeUsage(usageRaw); - const totalTokens = resolvePositiveUsageNumber(deriveSessionTotalTokens({ usage })); - const costUsd = extractTranscriptUsageCost(usageRaw); - const modelProvider = - typeof message.provider === "string" - ? message.provider.trim() - : typeof parsed.provider === "string" - ? parsed.provider.trim() - : undefined; - const model = - typeof message.model === "string" - ? message.model.trim() - : typeof parsed.model === "string" - ? parsed.model.trim() - : undefined; - const isDeliveryMirror = modelProvider === "openclaw" && model === "delivery-mirror"; - const hasMeaningfulUsage = - hasNonzeroUsage(usage) || - typeof totalTokens === "number" || - (typeof costUsd === "number" && Number.isFinite(costUsd)); - const hasModelIdentity = Boolean(modelProvider || model); - if (!hasMeaningfulUsage && !hasModelIdentity) { - continue; - } - if (isDeliveryMirror && !hasMeaningfulUsage) { - continue; - } - - sawSnapshot = true; - if (!isDeliveryMirror) { - if (modelProvider) { - snapshot.modelProvider = modelProvider; - } - if (model) { - snapshot.model = model; - } - } - if (typeof usage?.input === "number" && Number.isFinite(usage.input)) { - inputTokens += usage.input; - sawInputTokens = true; - } - if (typeof usage?.output === "number" && Number.isFinite(usage.output)) { - outputTokens += usage.output; - sawOutputTokens = true; - } - if (typeof usage?.cacheRead === "number" && Number.isFinite(usage.cacheRead)) { - cacheRead += usage.cacheRead; - sawCacheRead = true; - } - if (typeof usage?.cacheWrite === "number" && Number.isFinite(usage.cacheWrite)) { - cacheWrite += usage.cacheWrite; - sawCacheWrite = true; - } - if (typeof totalTokens === "number") { - snapshot.totalTokens = totalTokens; - snapshot.totalTokensFresh = true; - } - if (typeof costUsd === "number" && Number.isFinite(costUsd)) { - costUsdTotal += costUsd; - sawCost = true; - } - } catch { - // skip malformed lines + const current = extractUsageSnapshotFromTranscriptLine(line); + if (!current) { + continue; + } + sawSnapshot = true; + if (current.modelProvider) { + snapshot.modelProvider = current.modelProvider; + } + if (current.model) { + snapshot.model = current.model; + } + if (typeof current.inputTokens === "number") { + inputTokens += current.inputTokens; + sawInputTokens = true; + } + if (typeof current.outputTokens === "number") { + outputTokens += current.outputTokens; + sawOutputTokens = true; + } + if (typeof current.cacheRead === "number") { + cacheRead += current.cacheRead; + sawCacheRead = true; + } + if (typeof current.cacheWrite === "number") { + cacheWrite += current.cacheWrite; + sawCacheWrite = true; + } + if (typeof current.totalTokens === "number") { + snapshot.totalTokens = current.totalTokens; + snapshot.totalTokensFresh = true; + } + if (typeof current.costUsd === "number" && Number.isFinite(current.costUsd)) { + costUsdTotal += current.costUsd; + sawCost = true; } } @@ -1150,10 +1201,20 @@ function extractLatestUsageFromTranscriptLines( return snapshot; } -function extractLatestUsageFromTranscriptChunk( +function extractLatestUsageFromTranscriptLines( + lines: Iterable, +): SessionTranscriptUsageSnapshot | null { + let latest: SessionTranscriptUsageSnapshot | null = null; + for (const line of lines) { + latest = extractUsageSnapshotFromTranscriptLine(line) ?? latest; + } + return latest; +} + +function extractAggregateUsageFromTranscriptChunk( chunk: string, ): SessionTranscriptUsageSnapshot | null { - return extractLatestUsageFromTranscriptLines( + return extractAggregateUsageFromTranscriptLines( chunk.split(/\r?\n/).filter((line) => line.trim().length > 0), ); } @@ -1175,7 +1236,7 @@ export function readLatestSessionUsageFromTranscript( return null; } const chunk = fs.readFileSync(fd, "utf-8"); - return extractLatestUsageFromTranscriptChunk(chunk); + return extractAggregateUsageFromTranscriptChunk(chunk); }); } @@ -1201,6 +1262,62 @@ export async function readLatestSessionUsageFromTranscriptAsync( lines.push(line); } }); + return extractAggregateUsageFromTranscriptLines(lines); + } catch { + return null; + } +} + +export async function readRecentSessionUsageFromTranscriptAsync( + sessionId: string, + storePath: string | undefined, + sessionFile: string | undefined, + agentId: string | undefined, + maxBytes: number, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId); + if (!filePath) { + return null; + } + + try { + const stat = await fs.promises.stat(filePath); + if (stat.size === 0) { + return null; + } + const lines = await readRecentTranscriptTailLinesAsync(filePath, stat, { + maxMessages: 1, + maxLines: 1000, + maxBytes, + }); + return extractAggregateUsageFromTranscriptLines(lines); + } catch { + return null; + } +} + +export async function readLatestRecentSessionUsageFromTranscriptAsync( + sessionId: string, + storePath: string | undefined, + sessionFile: string | undefined, + agentId: string | undefined, + maxBytes: number, +): Promise { + const filePath = findExistingTranscriptPath(sessionId, storePath, sessionFile, agentId); + if (!filePath) { + return null; + } + + try { + const stat = await fs.promises.stat(filePath); + if (stat.size === 0) { + return null; + } + const lines = await readRecentTranscriptTailLinesAsync(filePath, stat, { + maxMessages: 1, + maxLines: 1000, + maxBytes, + }); return extractLatestUsageFromTranscriptLines(lines); } catch { return null; @@ -1236,7 +1353,7 @@ export function readRecentSessionUsageFromTranscript( .split(/\r?\n/) .slice(readStart > 0 ? 1 : 0) .join("\n"); - return extractLatestUsageFromTranscriptChunk(chunk); + return extractAggregateUsageFromTranscriptChunk(chunk); }); } diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index d7e45f5ba01..3c3b6e5c1b5 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -106,6 +106,8 @@ export { readFirstUserMessageFromTranscript, readLastMessagePreviewFromTranscript, readLatestSessionUsageFromTranscriptAsync, + readLatestRecentSessionUsageFromTranscriptAsync, + readRecentSessionUsageFromTranscriptAsync, readRecentSessionMessagesAsync, readRecentSessionMessagesWithStatsAsync, readRecentSessionTranscriptLines, @@ -118,6 +120,7 @@ export { visitSessionMessagesAsync, resolveSessionTranscriptCandidates, } from "./session-utils.fs.js"; +export type { ReadSessionMessagesAsyncOptions } from "./session-utils.fs.js"; export { canonicalizeSpawnedByForAgent, resolveSessionStoreKey } from "./session-store-key.js"; export type { GatewayAgentRow, diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts index d5d0e8570f1..31dcf597c45 100644 --- a/src/gateway/sessions-history-http.ts +++ b/src/gateway/sessions-history-http.ts @@ -168,7 +168,10 @@ export async function handleSessionHistoryHttpRequest( const rawSnapshot = boundedSnapshot?.messages ?? (entry?.sessionId - ? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile) + ? await readSessionMessagesAsync(entry.sessionId, target.storePath, entry.sessionFile, { + mode: "full", + reason: "session history cursor pagination", + }) : []); const historySnapshot = buildSessionHistorySnapshot({ rawMessages: rawSnapshot, diff --git a/src/tui/embedded-backend.ts b/src/tui/embedded-backend.ts index 8e0f8fbcde8..265c0b41ff8 100644 --- a/src/tui/embedded-backend.ts +++ b/src/tui/embedded-backend.ts @@ -196,16 +196,21 @@ export class EmbeddedTuiBackend implements TuiBackend { const sessionId = entry?.sessionId; const sessionAgentId = resolveSessionAgentId({ sessionKey: opts.sessionKey, config: cfg }); const resolvedSessionModel = resolveSessionModelRef(cfg, entry, sessionAgentId); + const max = Math.min(1000, typeof opts.limit === "number" ? opts.limit : 200); + const maxHistoryBytes = getMaxChatHistoryMessagesBytes(); const localMessages = sessionId && storePath - ? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile) + ? await readSessionMessagesAsync(sessionId, storePath, entry?.sessionFile, { + mode: "recent", + maxMessages: max, + maxBytes: Math.max(maxHistoryBytes * 2, 1024 * 1024), + }) : []; const rawMessages = augmentChatHistoryWithCliSessionImports({ entry, provider: resolvedSessionModel.provider, localMessages, }); - const max = Math.min(1000, typeof opts.limit === "number" ? opts.limit : 200); const effectiveMaxChars = resolveEffectiveChatHistoryMaxChars(cfg); const normalized = augmentChatHistoryWithCanvasBlocks( projectRecentChatDisplayMessages(rawMessages, { @@ -213,7 +218,6 @@ export class EmbeddedTuiBackend implements TuiBackend { maxMessages: max, }), ); - const maxHistoryBytes = getMaxChatHistoryMessagesBytes(); const perMessageHardCap = Math.min(CHAT_HISTORY_MAX_SINGLE_MESSAGE_BYTES, maxHistoryBytes); const replaced = replaceOversizedChatHistoryMessages({ messages: normalized,