fix(gateway): compute sessions.usage aggregate totals from all sessions, not just the limited page (fixes #76496) (#93612)

Merged via squash.

Prepared head SHA: 349b8cd066
Co-authored-by: liuhao1024 <11816344+liuhao1024@users.noreply.github.com>
Co-authored-by: vincentkoc <25068+vincentkoc@users.noreply.github.com>
Reviewed-by: @vincentkoc
This commit is contained in:
liuhao1024
2026-06-16 20:33:10 +08:00
committed by GitHub
parent e567986355
commit aebf0bbd2d
6 changed files with 387 additions and 39 deletions

View File

@@ -86,6 +86,15 @@ vi.mock("../../infra/session-cost-usage.js", async () => {
staleFiles: 0,
},
})),
loadSessionCostSummariesFromCache: vi.fn(async (params: { sessions: unknown[] }) => ({
summaries: params.sessions.map(() => null),
cacheStatus: {
status: "fresh",
cachedFiles: params.sessions.length,
pendingFiles: 0,
staleFiles: 0,
},
})),
loadSessionUsageTimeSeries: vi.fn(async () => ({
sessionId: "s-opus",
points: [],
@@ -97,6 +106,7 @@ vi.mock("../../infra/session-cost-usage.js", async () => {
import {
discoverAllSessions,
loadSessionCostSummaryFromCache,
loadSessionCostSummariesFromCache,
loadSessionLogs,
loadSessionUsageTimeSeries,
} from "../../infra/session-cost-usage.js";
@@ -726,4 +736,71 @@ describe("sessions.usage", () => {
],
]);
});
it("aggregate totals include all sessions even when limit restricts the page (#76496)", async () => {
// Override discoverAllSessions to return 3 sessions with distinct costs
vi.mocked(discoverAllSessions)
.mockResolvedValueOnce([
{ sessionId: "s-a", sessionFile: "/tmp/agents/main/sessions/s-a.jsonl", mtime: 300 },
{ sessionId: "s-b", sessionFile: "/tmp/agents/main/sessions/s-b.jsonl", mtime: 200 },
{ sessionId: "s-c", sessionFile: "/tmp/agents/main/sessions/s-c.jsonl", mtime: 100 },
])
.mockResolvedValueOnce([]); // second agent (opus) — no extra sessions
const buildUsage = (sessionId?: string) => {
const cost = sessionId === "s-a" ? 0.08 : sessionId === "s-b" ? 0.04 : 0.02;
const tokens = sessionId === "s-a" ? 15 : sessionId === "s-b" ? 10 : 5;
return {
input: tokens,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: tokens,
totalCost: cost,
inputCost: 0,
outputCost: 0,
cacheReadCost: 0,
cacheWriteCost: 0,
missingCostEntries: 0,
};
};
vi.mocked(loadSessionCostSummaryFromCache).mockImplementation(async ({ sessionId }) => ({
summary: buildUsage(sessionId),
cacheStatus: { status: "fresh", cachedFiles: 1, pendingFiles: 0, staleFiles: 0 },
}));
vi.mocked(loadSessionCostSummariesFromCache).mockImplementation(async ({ sessions }) => {
return {
summaries: sessions.map((session) => buildUsage(session.sessionId)),
cacheStatus: {
status: "fresh",
cachedFiles: sessions.length,
pendingFiles: 0,
staleFiles: 0,
},
};
});
const respond = await runSessionsUsage({
...BASE_USAGE_RANGE,
agentScope: "all",
limit: 1,
});
expect(respond).toHaveBeenCalledTimes(1);
expect(mockArg(respond, 0, 0)).toBe(true);
const result = mockArg(respond, 0, 1) as {
sessions: Array<{ key: string }>;
totals: { totalCost: number; totalTokens: number };
};
// Only the most-recent session (s-a, mtime=300) appears in the page
expect(result.sessions).toHaveLength(1);
expect(result.sessions[0].key).toContain("s-a");
expect(vi.mocked(loadSessionCostSummaryFromCache)).toHaveBeenCalledTimes(1);
expect(vi.mocked(loadSessionCostSummariesFromCache)).toHaveBeenCalledTimes(1);
// But aggregate totals must include all 3 sessions (0.08 + 0.04 + 0.02 = 0.14)
expect(result.totals.totalCost).toBeCloseTo(0.14);
expect(result.totals.totalTokens).toBe(30);
});
});

View File

@@ -28,6 +28,7 @@ import {
loadCostUsageSummaryFromCache,
loadSessionLogs,
loadSessionCostSummaryFromCache,
loadSessionCostSummariesFromCache,
loadSessionUsageTimeSeries,
discoverAllSessions,
resolveExistingUsageSessionFile,
@@ -1145,7 +1146,6 @@ export const usageHandlers: GatewayRequestHandlers = {
// Sort by most recent first
mergedEntries.sort((a, b) => b.updatedAt - a.updatedAt);
// Apply limit
const limitedEntries = mergedEntries.slice(0, limit);
// Load usage for each session
@@ -1232,7 +1232,7 @@ export const usageHandlers: GatewayRequestHandlers = {
};
const usageByEntryIndex: Array<SessionCostSummary | null> = Array.from(
{ length: limitedEntries.length },
{ length: mergedEntries.length },
() => null,
);
const usageLoadTasks: Array<
@@ -1289,7 +1289,7 @@ export const usageHandlers: GatewayRequestHandlers = {
if (!loaded.summary) {
continue;
}
const merged = limitedEntries[loaded.entryIndex];
const merged = mergedEntries[loaded.entryIndex];
const usage = usageByEntryIndex[loaded.entryIndex] ?? createEmptySessionCostSummary();
usage.sessionId = merged.sessionId;
usage.sessionFile = merged.sessionFile;
@@ -1297,7 +1297,53 @@ export const usageHandlers: GatewayRequestHandlers = {
usageByEntryIndex[loaded.entryIndex] = usage;
}
for (const [entryIndex, merged] of limitedEntries.entries()) {
const hiddenSessionsByAgent = new Map<
string | undefined,
Array<{ entryIndex: number; sessionId: string; sessionFile: string }>
>();
for (const [entryIndex, merged] of mergedEntries.entries()) {
if (entryIndex < limitedEntries.length) {
continue;
}
const hiddenSessions = hiddenSessionsByAgent.get(merged.agentId) ?? [];
for (const includedSessionId of merged.includedSessionIds ?? [merged.sessionId]) {
const sessionFile =
includedSessionId === merged.sessionId
? merged.sessionFile
: resolveExistingUsageSessionFile({
sessionId: includedSessionId,
agentId: merged.agentId,
});
if (sessionFile) {
hiddenSessions.push({ entryIndex, sessionId: includedSessionId, sessionFile });
}
}
hiddenSessionsByAgent.set(merged.agentId, hiddenSessions);
}
for (const [agentId, hiddenSessions] of hiddenSessionsByAgent) {
const hiddenUsage = await loadSessionCostSummariesFromCache({
sessions: hiddenSessions,
config,
agentId,
startMs,
endMs,
});
cacheStatus = mergeUsageCacheStatus(cacheStatus, hiddenUsage.cacheStatus);
for (const [hiddenIndex, summary] of hiddenUsage.summaries.entries()) {
if (!summary) {
continue;
}
const hiddenSession = hiddenSessions[hiddenIndex];
const merged = mergedEntries[hiddenSession.entryIndex];
const usage = usageByEntryIndex[hiddenSession.entryIndex] ?? createEmptySessionCostSummary();
usage.sessionId = merged.sessionId;
usage.sessionFile = merged.sessionFile;
mergeSessionUsageInto(usage, summary);
usageByEntryIndex[hiddenSession.entryIndex] = usage;
}
}
for (const [entryIndex, merged] of mergedEntries.entries()) {
const agentId = merged.agentId;
const usage = usageByEntryIndex[entryIndex];
@@ -1433,29 +1479,31 @@ export const usageHandlers: GatewayRequestHandlers = {
}
}
sessions.push({
key: merged.key,
label: merged.label,
sessionId: merged.sessionId,
scope: merged.scope ?? "instance",
sessionFamilyKey: merged.sessionFamilyKey,
currentSessionId: merged.currentSessionId,
includedSessionIds: merged.includedSessionIds,
historicalInstanceCount: merged.includedSessionIds?.length,
updatedAt: merged.updatedAt,
agentId,
channel,
chatType,
origin: merged.storeEntry?.origin,
modelOverride: merged.storeEntry?.modelOverride,
providerOverride: merged.storeEntry?.providerOverride,
modelProvider: merged.storeEntry?.modelProvider,
model: merged.storeEntry?.model,
usage,
contextWeight: includeContextWeight
? (merged.storeEntry?.systemPromptReport ?? null)
: undefined,
});
if (entryIndex < limit) {
sessions.push({
key: merged.key,
label: merged.label,
sessionId: merged.sessionId,
scope: merged.scope ?? "instance",
sessionFamilyKey: merged.sessionFamilyKey,
currentSessionId: merged.currentSessionId,
includedSessionIds: merged.includedSessionIds,
historicalInstanceCount: merged.includedSessionIds?.length,
updatedAt: merged.updatedAt,
agentId,
channel,
chatType,
origin: merged.storeEntry?.origin,
modelOverride: merged.storeEntry?.modelOverride,
providerOverride: merged.storeEntry?.providerOverride,
modelProvider: merged.storeEntry?.modelProvider,
model: merged.storeEntry?.model,
usage,
contextWeight: includeContextWeight
? (merged.storeEntry?.systemPromptReport ?? null)
: undefined,
});
}
}
// Format dates back to YYYY-MM-DD strings

View File

@@ -17,6 +17,7 @@ import {
loadCostUsageSummaryFromCache,
loadSessionCostSummary,
loadSessionCostSummaryFromCache,
loadSessionCostSummariesFromCache,
loadSessionLogs,
loadSessionUsageTimeSeries,
requestCostUsageCacheRefresh,
@@ -346,6 +347,45 @@ describe("session cost usage", () => {
});
});
it("loads multiple session summaries from one durable cache snapshot", async () => {
const root = await makeSessionCostRoot("cost-cache-batch");
const sessionsDir = path.join(root, "agents", "main", "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const sessionFiles = await Promise.all(
["sess-a", "sess-b"].map(async (sessionId, index) => {
const sessionFile = path.join(sessionsDir, `${sessionId}.jsonl`);
await fs.writeFile(
sessionFile,
transcriptText(sessionId, {
type: "message",
timestamp: `2026-02-05T12:0${index}:00.000Z`,
message: {
role: "assistant",
provider: "openai",
model: "gpt-5.5",
usage: { input: index + 1, output: 0, totalTokens: index + 1 },
},
}),
"utf-8",
);
return { sessionId, sessionFile };
}),
);
await withStateDir(root, async () => {
await refreshCostUsageCache({ sessionFiles: sessionFiles.map((entry) => entry.sessionFile) });
const result = await loadSessionCostSummariesFromCache({
sessions: sessionFiles,
agentId: "main",
startMs: Date.UTC(2026, 1, 5),
endMs: Date.UTC(2026, 1, 5) + 24 * 60 * 60 * 1000 - 1,
});
expect(result.cacheStatus.status).toBe("fresh");
expect(result.summaries.map((summary) => summary?.totalTokens)).toEqual([1, 2]);
});
});
it("ignores compaction checkpoint transcript snapshots in daily totals and discovery", async () => {
const root = await makeSessionCostRoot("cost-checkpoint");
const sessionsDir = path.join(root, "agents", "main", "sessions");

View File

@@ -1797,6 +1797,96 @@ export async function loadSessionCostSummaryFromCache(params: {
};
}
export async function loadSessionCostSummariesFromCache(params: {
sessions: Array<{ sessionId?: string; sessionFile: string }>;
config?: OpenClawConfig;
agentId?: string;
startMs?: number;
endMs?: number;
requestRefresh?: boolean;
}): Promise<{ summaries: Array<SessionCostSummary | null>; cacheStatus: UsageCacheStatus }> {
const cachePath = resolveUsageCostCachePath(params.agentId);
const pricingFingerprint = resolveUsageCostPricingFingerprint(params.config);
const statTasks = params.sessions.map(
(session) => async () => await fs.promises.stat(session.sessionFile).catch(() => null),
);
const statsPromise = runTasksWithConcurrency({
tasks: statTasks,
limit: USAGE_COST_TRANSCRIPT_STAT_CONCURRENCY,
}).then(({ results }) => results);
const [cache, stats, refreshRunning] = await Promise.all([
readUsageCostCache(cachePath),
statsPromise,
isUsageCostCacheRefreshRunning(cachePath),
]);
const staleFiles = new Set<string>();
let cachedFiles = 0;
const summaries = params.sessions.map((session, index) => {
const stat = stats[index];
const file = stat
? { filePath: session.sessionFile, size: stat.size, mtimeMs: stat.mtimeMs }
: undefined;
const entry = cache.files[session.sessionFile];
const stale =
!file ||
!isUsageCostCacheEntryFresh({
entry,
file,
pricingFingerprint,
requireSessionSummary: true,
});
if (stale) {
staleFiles.add(session.sessionFile);
return null;
}
cachedFiles += 1;
const summary = entry?.sessionSummary ?? null;
if (
summary &&
params.startMs !== undefined &&
params.endMs !== undefined &&
!isSessionSummaryContainedInRange(summary, params.startMs, params.endMs)
) {
return entry
? buildSessionCostSummaryFromCacheEntry({
entry,
sessionId: session.sessionId,
sessionFile: session.sessionFile,
startMs: params.startMs,
endMs: params.endMs,
})
: null;
}
return summary;
});
const refreshRequested = params.requestRefresh !== false && staleFiles.size > 0;
if (refreshRequested) {
requestCostUsageCacheRefresh({
config: params.config,
agentId: params.agentId,
sessionFiles: [...staleFiles],
});
}
const staleFileCount = staleFiles.size;
return {
summaries,
cacheStatus: {
status:
staleFileCount === 0
? "fresh"
: refreshRunning || refreshRequested
? "refreshing"
: cachedFiles > 0
? "partial"
: "stale",
cachedFiles,
pendingFiles: staleFileCount,
staleFiles: staleFileCount,
refreshedAt: cache.updatedAt || undefined,
},
};
}
export function requestCostUsageCacheRefresh(params?: {
config?: OpenClawConfig;
agentId?: string;

View File

@@ -187,4 +187,82 @@ describe("renderUsage", () => {
expect(container.textContent).toContain("agent:research:main");
expect(container.textContent).not.toContain("agent:main:main");
});
it("keeps session-derived insights scoped to the visible page when the page limit is hit", () => {
const container = document.createElement("div");
render(
renderUsage(
createUsageProps({
data: {
...createUsageProps().data,
sessionsLimitReached: true,
totals: {
input: 1_000,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 1_000,
totalCost: 10,
inputCost: 10,
outputCost: 0,
cacheReadCost: 0,
cacheWriteCost: 0,
missingCostEntries: 0,
},
aggregates: {
messages: {
total: 100,
user: 50,
assistant: 50,
toolCalls: 0,
toolResults: 0,
errors: 0,
},
tools: { totalCalls: 0, uniqueTools: 0, tools: [] },
byModel: [],
byProvider: [],
byAgent: [],
byChannel: [],
daily: [],
},
sessions: [
{
key: "agent:main:visible",
agentId: "main",
lastUpdated: Date.now(),
usage: {
input: 10,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 10,
totalCost: 0.1,
inputCost: 0.1,
outputCost: 0,
cacheReadCost: 0,
cacheWriteCost: 0,
missingCostEntries: 0,
messageCounts: {
total: 2,
user: 1,
assistant: 1,
toolCalls: 0,
toolResults: 0,
errors: 0,
},
},
} as UsageProps["data"]["sessions"][number],
],
},
}),
),
container,
);
const messagesValue = container.querySelector(
".usage-overview-card .usage-summary-card--hero .usage-summary-value",
);
expect(messagesValue?.textContent?.trim()).toBe("2");
});
});

View File

@@ -290,7 +290,22 @@ export function renderUsage(props: UsageProps) {
: filters.selectedDays.length > 0
? dayFilteredSessions
: sortedSessions;
const activeAggregates = buildAggregatesFromSessions(aggregateSessions, data.aggregates);
const hasAggregateFilters =
filters.selectedSessions.length > 0 ||
hasQuery ||
filters.selectedHours.length > 0 ||
filters.selectedDays.length > 0 ||
Boolean(filters.agentId);
const activeAggregates = hasAggregateFilters
? buildAggregatesFromSessions(aggregateSessions, data.aggregates)
: buildAggregatesFromSessions([], data.aggregates);
const insightsUseVisiblePage = data.sessionsLimitReached && !hasAggregateFilters;
const insightTotals = insightsUseVisiblePage
? computeSessionTotals(aggregateSessions)
: displayTotals;
const insightAggregates = insightsUseVisiblePage
? buildAggregatesFromSessions(aggregateSessions)
: activeAggregates;
// Filter daily chart data if sessions are selected
const filteredDaily =
@@ -311,18 +326,18 @@ export function renderUsage(props: UsageProps) {
})()
: data.costDaily;
const insightStats = buildUsageInsightStats(aggregateSessions, displayTotals, activeAggregates);
const insightStats = buildUsageInsightStats(aggregateSessions, insightTotals, insightAggregates);
const isEmpty = !data.loading && !data.totals && data.sessions.length === 0;
const cacheStatusTitle = getUsageCacheRefreshTitle(data.cacheStatus);
const hasMissingCost =
(displayTotals?.missingCostEntries ?? 0) > 0 ||
(displayTotals
? displayTotals.totalTokens > 0 &&
displayTotals.totalCost === 0 &&
displayTotals.input +
displayTotals.output +
displayTotals.cacheRead +
displayTotals.cacheWrite >
(insightTotals?.missingCostEntries ?? 0) > 0 ||
(insightTotals
? insightTotals.totalTokens > 0 &&
insightTotals.totalCost === 0 &&
insightTotals.input +
insightTotals.output +
insightTotals.cacheRead +
insightTotals.cacheWrite >
0
: false);
const datePresets = [
@@ -790,8 +805,8 @@ export function renderUsage(props: UsageProps) {
? renderUsageEmptyState(filterActions.onRefresh)
: html`
${renderUsageInsights(
displayTotals,
activeAggregates,
insightTotals,
insightAggregates,
insightStats,
hasMissingCost,
buildPeakErrorHours(aggregateSessions, filters.timeZone),