From aebf0bbd2da7edf6d4bb424bb9653b00cfbaa971 Mon Sep 17 00:00:00 2001 From: liuhao1024 Date: Tue, 16 Jun 2026 20:33:10 +0800 Subject: [PATCH] fix(gateway): compute sessions.usage aggregate totals from all sessions, not just the limited page (fixes #76496) (#93612) Merged via squash. Prepared head SHA: 349b8cd066755a6bc3f032338cb1981467203bce Co-authored-by: liuhao1024 <11816344+liuhao1024@users.noreply.github.com> Co-authored-by: vincentkoc <25068+vincentkoc@users.noreply.github.com> Reviewed-by: @vincentkoc --- .../usage.sessions-usage.test.ts | 77 +++++++++++++ src/gateway/server-methods/usage.ts | 102 +++++++++++++----- src/infra/session-cost-usage.test.ts | 40 +++++++ src/infra/session-cost-usage.ts | 90 ++++++++++++++++ ui/src/ui/views/usage.test.ts | 78 ++++++++++++++ ui/src/ui/views/usage.ts | 39 ++++--- 6 files changed, 387 insertions(+), 39 deletions(-) diff --git a/src/gateway/server-methods/usage.sessions-usage.test.ts b/src/gateway/server-methods/usage.sessions-usage.test.ts index 1a699c16def..e0fd74352a1 100644 --- a/src/gateway/server-methods/usage.sessions-usage.test.ts +++ b/src/gateway/server-methods/usage.sessions-usage.test.ts @@ -86,6 +86,15 @@ vi.mock("../../infra/session-cost-usage.js", async () => { staleFiles: 0, }, })), + loadSessionCostSummariesFromCache: vi.fn(async (params: { sessions: unknown[] }) => ({ + summaries: params.sessions.map(() => null), + cacheStatus: { + status: "fresh", + cachedFiles: params.sessions.length, + pendingFiles: 0, + staleFiles: 0, + }, + })), loadSessionUsageTimeSeries: vi.fn(async () => ({ sessionId: "s-opus", points: [], @@ -97,6 +106,7 @@ vi.mock("../../infra/session-cost-usage.js", async () => { import { discoverAllSessions, loadSessionCostSummaryFromCache, + loadSessionCostSummariesFromCache, loadSessionLogs, loadSessionUsageTimeSeries, } from "../../infra/session-cost-usage.js"; @@ -726,4 +736,71 @@ describe("sessions.usage", () => { ], ]); }); + + it("aggregate totals include all sessions even when limit restricts the page (#76496)", async () => { + // Override discoverAllSessions to return 3 sessions with distinct costs + vi.mocked(discoverAllSessions) + .mockResolvedValueOnce([ + { sessionId: "s-a", sessionFile: "/tmp/agents/main/sessions/s-a.jsonl", mtime: 300 }, + { sessionId: "s-b", sessionFile: "/tmp/agents/main/sessions/s-b.jsonl", mtime: 200 }, + { sessionId: "s-c", sessionFile: "/tmp/agents/main/sessions/s-c.jsonl", mtime: 100 }, + ]) + .mockResolvedValueOnce([]); // second agent (opus) — no extra sessions + + const buildUsage = (sessionId?: string) => { + const cost = sessionId === "s-a" ? 0.08 : sessionId === "s-b" ? 0.04 : 0.02; + const tokens = sessionId === "s-a" ? 15 : sessionId === "s-b" ? 10 : 5; + return { + input: tokens, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: tokens, + totalCost: cost, + inputCost: 0, + outputCost: 0, + cacheReadCost: 0, + cacheWriteCost: 0, + missingCostEntries: 0, + }; + }; + vi.mocked(loadSessionCostSummaryFromCache).mockImplementation(async ({ sessionId }) => ({ + summary: buildUsage(sessionId), + cacheStatus: { status: "fresh", cachedFiles: 1, pendingFiles: 0, staleFiles: 0 }, + })); + vi.mocked(loadSessionCostSummariesFromCache).mockImplementation(async ({ sessions }) => { + return { + summaries: sessions.map((session) => buildUsage(session.sessionId)), + cacheStatus: { + status: "fresh", + cachedFiles: sessions.length, + pendingFiles: 0, + staleFiles: 0, + }, + }; + }); + + const respond = await runSessionsUsage({ + ...BASE_USAGE_RANGE, + agentScope: "all", + limit: 1, + }); + + expect(respond).toHaveBeenCalledTimes(1); + expect(mockArg(respond, 0, 0)).toBe(true); + const result = mockArg(respond, 0, 1) as { + sessions: Array<{ key: string }>; + totals: { totalCost: number; totalTokens: number }; + }; + + // Only the most-recent session (s-a, mtime=300) appears in the page + expect(result.sessions).toHaveLength(1); + expect(result.sessions[0].key).toContain("s-a"); + expect(vi.mocked(loadSessionCostSummaryFromCache)).toHaveBeenCalledTimes(1); + expect(vi.mocked(loadSessionCostSummariesFromCache)).toHaveBeenCalledTimes(1); + + // But aggregate totals must include all 3 sessions (0.08 + 0.04 + 0.02 = 0.14) + expect(result.totals.totalCost).toBeCloseTo(0.14); + expect(result.totals.totalTokens).toBe(30); + }); }); diff --git a/src/gateway/server-methods/usage.ts b/src/gateway/server-methods/usage.ts index f9be5faa5ea..25a79c9c008 100644 --- a/src/gateway/server-methods/usage.ts +++ b/src/gateway/server-methods/usage.ts @@ -28,6 +28,7 @@ import { loadCostUsageSummaryFromCache, loadSessionLogs, loadSessionCostSummaryFromCache, + loadSessionCostSummariesFromCache, loadSessionUsageTimeSeries, discoverAllSessions, resolveExistingUsageSessionFile, @@ -1145,7 +1146,6 @@ export const usageHandlers: GatewayRequestHandlers = { // Sort by most recent first mergedEntries.sort((a, b) => b.updatedAt - a.updatedAt); - // Apply limit const limitedEntries = mergedEntries.slice(0, limit); // Load usage for each session @@ -1232,7 +1232,7 @@ export const usageHandlers: GatewayRequestHandlers = { }; const usageByEntryIndex: Array = Array.from( - { length: limitedEntries.length }, + { length: mergedEntries.length }, () => null, ); const usageLoadTasks: Array< @@ -1289,7 +1289,7 @@ export const usageHandlers: GatewayRequestHandlers = { if (!loaded.summary) { continue; } - const merged = limitedEntries[loaded.entryIndex]; + const merged = mergedEntries[loaded.entryIndex]; const usage = usageByEntryIndex[loaded.entryIndex] ?? createEmptySessionCostSummary(); usage.sessionId = merged.sessionId; usage.sessionFile = merged.sessionFile; @@ -1297,7 +1297,53 @@ export const usageHandlers: GatewayRequestHandlers = { usageByEntryIndex[loaded.entryIndex] = usage; } - for (const [entryIndex, merged] of limitedEntries.entries()) { + const hiddenSessionsByAgent = new Map< + string | undefined, + Array<{ entryIndex: number; sessionId: string; sessionFile: string }> + >(); + for (const [entryIndex, merged] of mergedEntries.entries()) { + if (entryIndex < limitedEntries.length) { + continue; + } + const hiddenSessions = hiddenSessionsByAgent.get(merged.agentId) ?? []; + for (const includedSessionId of merged.includedSessionIds ?? [merged.sessionId]) { + const sessionFile = + includedSessionId === merged.sessionId + ? merged.sessionFile + : resolveExistingUsageSessionFile({ + sessionId: includedSessionId, + agentId: merged.agentId, + }); + if (sessionFile) { + hiddenSessions.push({ entryIndex, sessionId: includedSessionId, sessionFile }); + } + } + hiddenSessionsByAgent.set(merged.agentId, hiddenSessions); + } + for (const [agentId, hiddenSessions] of hiddenSessionsByAgent) { + const hiddenUsage = await loadSessionCostSummariesFromCache({ + sessions: hiddenSessions, + config, + agentId, + startMs, + endMs, + }); + cacheStatus = mergeUsageCacheStatus(cacheStatus, hiddenUsage.cacheStatus); + for (const [hiddenIndex, summary] of hiddenUsage.summaries.entries()) { + if (!summary) { + continue; + } + const hiddenSession = hiddenSessions[hiddenIndex]; + const merged = mergedEntries[hiddenSession.entryIndex]; + const usage = usageByEntryIndex[hiddenSession.entryIndex] ?? createEmptySessionCostSummary(); + usage.sessionId = merged.sessionId; + usage.sessionFile = merged.sessionFile; + mergeSessionUsageInto(usage, summary); + usageByEntryIndex[hiddenSession.entryIndex] = usage; + } + } + + for (const [entryIndex, merged] of mergedEntries.entries()) { const agentId = merged.agentId; const usage = usageByEntryIndex[entryIndex]; @@ -1433,29 +1479,31 @@ export const usageHandlers: GatewayRequestHandlers = { } } - sessions.push({ - key: merged.key, - label: merged.label, - sessionId: merged.sessionId, - scope: merged.scope ?? "instance", - sessionFamilyKey: merged.sessionFamilyKey, - currentSessionId: merged.currentSessionId, - includedSessionIds: merged.includedSessionIds, - historicalInstanceCount: merged.includedSessionIds?.length, - updatedAt: merged.updatedAt, - agentId, - channel, - chatType, - origin: merged.storeEntry?.origin, - modelOverride: merged.storeEntry?.modelOverride, - providerOverride: merged.storeEntry?.providerOverride, - modelProvider: merged.storeEntry?.modelProvider, - model: merged.storeEntry?.model, - usage, - contextWeight: includeContextWeight - ? (merged.storeEntry?.systemPromptReport ?? null) - : undefined, - }); + if (entryIndex < limit) { + sessions.push({ + key: merged.key, + label: merged.label, + sessionId: merged.sessionId, + scope: merged.scope ?? "instance", + sessionFamilyKey: merged.sessionFamilyKey, + currentSessionId: merged.currentSessionId, + includedSessionIds: merged.includedSessionIds, + historicalInstanceCount: merged.includedSessionIds?.length, + updatedAt: merged.updatedAt, + agentId, + channel, + chatType, + origin: merged.storeEntry?.origin, + modelOverride: merged.storeEntry?.modelOverride, + providerOverride: merged.storeEntry?.providerOverride, + modelProvider: merged.storeEntry?.modelProvider, + model: merged.storeEntry?.model, + usage, + contextWeight: includeContextWeight + ? (merged.storeEntry?.systemPromptReport ?? null) + : undefined, + }); + } } // Format dates back to YYYY-MM-DD strings diff --git a/src/infra/session-cost-usage.test.ts b/src/infra/session-cost-usage.test.ts index 61c08077206..14914d18f9c 100644 --- a/src/infra/session-cost-usage.test.ts +++ b/src/infra/session-cost-usage.test.ts @@ -17,6 +17,7 @@ import { loadCostUsageSummaryFromCache, loadSessionCostSummary, loadSessionCostSummaryFromCache, + loadSessionCostSummariesFromCache, loadSessionLogs, loadSessionUsageTimeSeries, requestCostUsageCacheRefresh, @@ -346,6 +347,45 @@ describe("session cost usage", () => { }); }); + it("loads multiple session summaries from one durable cache snapshot", async () => { + const root = await makeSessionCostRoot("cost-cache-batch"); + const sessionsDir = path.join(root, "agents", "main", "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + const sessionFiles = await Promise.all( + ["sess-a", "sess-b"].map(async (sessionId, index) => { + const sessionFile = path.join(sessionsDir, `${sessionId}.jsonl`); + await fs.writeFile( + sessionFile, + transcriptText(sessionId, { + type: "message", + timestamp: `2026-02-05T12:0${index}:00.000Z`, + message: { + role: "assistant", + provider: "openai", + model: "gpt-5.5", + usage: { input: index + 1, output: 0, totalTokens: index + 1 }, + }, + }), + "utf-8", + ); + return { sessionId, sessionFile }; + }), + ); + + await withStateDir(root, async () => { + await refreshCostUsageCache({ sessionFiles: sessionFiles.map((entry) => entry.sessionFile) }); + const result = await loadSessionCostSummariesFromCache({ + sessions: sessionFiles, + agentId: "main", + startMs: Date.UTC(2026, 1, 5), + endMs: Date.UTC(2026, 1, 5) + 24 * 60 * 60 * 1000 - 1, + }); + + expect(result.cacheStatus.status).toBe("fresh"); + expect(result.summaries.map((summary) => summary?.totalTokens)).toEqual([1, 2]); + }); + }); + it("ignores compaction checkpoint transcript snapshots in daily totals and discovery", async () => { const root = await makeSessionCostRoot("cost-checkpoint"); const sessionsDir = path.join(root, "agents", "main", "sessions"); diff --git a/src/infra/session-cost-usage.ts b/src/infra/session-cost-usage.ts index 0293d15ffe9..443ab5aa723 100644 --- a/src/infra/session-cost-usage.ts +++ b/src/infra/session-cost-usage.ts @@ -1797,6 +1797,96 @@ export async function loadSessionCostSummaryFromCache(params: { }; } +export async function loadSessionCostSummariesFromCache(params: { + sessions: Array<{ sessionId?: string; sessionFile: string }>; + config?: OpenClawConfig; + agentId?: string; + startMs?: number; + endMs?: number; + requestRefresh?: boolean; +}): Promise<{ summaries: Array; cacheStatus: UsageCacheStatus }> { + const cachePath = resolveUsageCostCachePath(params.agentId); + const pricingFingerprint = resolveUsageCostPricingFingerprint(params.config); + const statTasks = params.sessions.map( + (session) => async () => await fs.promises.stat(session.sessionFile).catch(() => null), + ); + const statsPromise = runTasksWithConcurrency({ + tasks: statTasks, + limit: USAGE_COST_TRANSCRIPT_STAT_CONCURRENCY, + }).then(({ results }) => results); + const [cache, stats, refreshRunning] = await Promise.all([ + readUsageCostCache(cachePath), + statsPromise, + isUsageCostCacheRefreshRunning(cachePath), + ]); + const staleFiles = new Set(); + let cachedFiles = 0; + const summaries = params.sessions.map((session, index) => { + const stat = stats[index]; + const file = stat + ? { filePath: session.sessionFile, size: stat.size, mtimeMs: stat.mtimeMs } + : undefined; + const entry = cache.files[session.sessionFile]; + const stale = + !file || + !isUsageCostCacheEntryFresh({ + entry, + file, + pricingFingerprint, + requireSessionSummary: true, + }); + if (stale) { + staleFiles.add(session.sessionFile); + return null; + } + cachedFiles += 1; + const summary = entry?.sessionSummary ?? null; + if ( + summary && + params.startMs !== undefined && + params.endMs !== undefined && + !isSessionSummaryContainedInRange(summary, params.startMs, params.endMs) + ) { + return entry + ? buildSessionCostSummaryFromCacheEntry({ + entry, + sessionId: session.sessionId, + sessionFile: session.sessionFile, + startMs: params.startMs, + endMs: params.endMs, + }) + : null; + } + return summary; + }); + const refreshRequested = params.requestRefresh !== false && staleFiles.size > 0; + if (refreshRequested) { + requestCostUsageCacheRefresh({ + config: params.config, + agentId: params.agentId, + sessionFiles: [...staleFiles], + }); + } + const staleFileCount = staleFiles.size; + return { + summaries, + cacheStatus: { + status: + staleFileCount === 0 + ? "fresh" + : refreshRunning || refreshRequested + ? "refreshing" + : cachedFiles > 0 + ? "partial" + : "stale", + cachedFiles, + pendingFiles: staleFileCount, + staleFiles: staleFileCount, + refreshedAt: cache.updatedAt || undefined, + }, + }; +} + export function requestCostUsageCacheRefresh(params?: { config?: OpenClawConfig; agentId?: string; diff --git a/ui/src/ui/views/usage.test.ts b/ui/src/ui/views/usage.test.ts index e89cba684e2..373e7f1875f 100644 --- a/ui/src/ui/views/usage.test.ts +++ b/ui/src/ui/views/usage.test.ts @@ -187,4 +187,82 @@ describe("renderUsage", () => { expect(container.textContent).toContain("agent:research:main"); expect(container.textContent).not.toContain("agent:main:main"); }); + + it("keeps session-derived insights scoped to the visible page when the page limit is hit", () => { + const container = document.createElement("div"); + + render( + renderUsage( + createUsageProps({ + data: { + ...createUsageProps().data, + sessionsLimitReached: true, + totals: { + input: 1_000, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 1_000, + totalCost: 10, + inputCost: 10, + outputCost: 0, + cacheReadCost: 0, + cacheWriteCost: 0, + missingCostEntries: 0, + }, + aggregates: { + messages: { + total: 100, + user: 50, + assistant: 50, + toolCalls: 0, + toolResults: 0, + errors: 0, + }, + tools: { totalCalls: 0, uniqueTools: 0, tools: [] }, + byModel: [], + byProvider: [], + byAgent: [], + byChannel: [], + daily: [], + }, + sessions: [ + { + key: "agent:main:visible", + agentId: "main", + lastUpdated: Date.now(), + usage: { + input: 10, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 10, + totalCost: 0.1, + inputCost: 0.1, + outputCost: 0, + cacheReadCost: 0, + cacheWriteCost: 0, + missingCostEntries: 0, + messageCounts: { + total: 2, + user: 1, + assistant: 1, + toolCalls: 0, + toolResults: 0, + errors: 0, + }, + }, + } as UsageProps["data"]["sessions"][number], + ], + }, + }), + ), + container, + ); + + const messagesValue = container.querySelector( + ".usage-overview-card .usage-summary-card--hero .usage-summary-value", + ); + expect(messagesValue?.textContent?.trim()).toBe("2"); + }); }); diff --git a/ui/src/ui/views/usage.ts b/ui/src/ui/views/usage.ts index 8e061e0d66f..cd980e13fd1 100644 --- a/ui/src/ui/views/usage.ts +++ b/ui/src/ui/views/usage.ts @@ -290,7 +290,22 @@ export function renderUsage(props: UsageProps) { : filters.selectedDays.length > 0 ? dayFilteredSessions : sortedSessions; - const activeAggregates = buildAggregatesFromSessions(aggregateSessions, data.aggregates); + const hasAggregateFilters = + filters.selectedSessions.length > 0 || + hasQuery || + filters.selectedHours.length > 0 || + filters.selectedDays.length > 0 || + Boolean(filters.agentId); + const activeAggregates = hasAggregateFilters + ? buildAggregatesFromSessions(aggregateSessions, data.aggregates) + : buildAggregatesFromSessions([], data.aggregates); + const insightsUseVisiblePage = data.sessionsLimitReached && !hasAggregateFilters; + const insightTotals = insightsUseVisiblePage + ? computeSessionTotals(aggregateSessions) + : displayTotals; + const insightAggregates = insightsUseVisiblePage + ? buildAggregatesFromSessions(aggregateSessions) + : activeAggregates; // Filter daily chart data if sessions are selected const filteredDaily = @@ -311,18 +326,18 @@ export function renderUsage(props: UsageProps) { })() : data.costDaily; - const insightStats = buildUsageInsightStats(aggregateSessions, displayTotals, activeAggregates); + const insightStats = buildUsageInsightStats(aggregateSessions, insightTotals, insightAggregates); const isEmpty = !data.loading && !data.totals && data.sessions.length === 0; const cacheStatusTitle = getUsageCacheRefreshTitle(data.cacheStatus); const hasMissingCost = - (displayTotals?.missingCostEntries ?? 0) > 0 || - (displayTotals - ? displayTotals.totalTokens > 0 && - displayTotals.totalCost === 0 && - displayTotals.input + - displayTotals.output + - displayTotals.cacheRead + - displayTotals.cacheWrite > + (insightTotals?.missingCostEntries ?? 0) > 0 || + (insightTotals + ? insightTotals.totalTokens > 0 && + insightTotals.totalCost === 0 && + insightTotals.input + + insightTotals.output + + insightTotals.cacheRead + + insightTotals.cacheWrite > 0 : false); const datePresets = [ @@ -790,8 +805,8 @@ export function renderUsage(props: UsageProps) { ? renderUsageEmptyState(filterActions.onRefresh) : html` ${renderUsageInsights( - displayTotals, - activeAggregates, + insightTotals, + insightAggregates, insightStats, hasMissingCost, buildPeakErrorHours(aggregateSessions, filters.timeZone),