From 2908190ba208e599bfc53f984c77c122fda1eff9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 23 Apr 2026 04:16:57 +0100 Subject: [PATCH] fix(agents): recover streamed timings usage (#41056) (thanks @xaeon2026) --- CHANGELOG.md | 1 + ...bedded-subscribe.handlers.messages.test.ts | 3 + ...pi-embedded-subscribe.handlers.messages.ts | 8 +++ .../pi-embedded-subscribe.handlers.types.ts | 3 + ...session.subscribeembeddedpisession.test.ts | 71 +++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 53 +++++++++++++- src/agents/usage.normalization.test.ts | 47 ++++++++++++ src/agents/usage.ts | 40 +++++++++-- 8 files changed, 216 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2413e424a5..364b37d2124 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Docs: https://docs.openclaw.ai - Providers/Amazon Bedrock Mantle: add Claude Opus 4.7 through Mantle's Anthropic Messages route with provider-owned bearer-auth streaming, so the model is actually callable without treating AWS bearer tokens like Anthropic API keys. Thanks @wirjo. - Providers/OpenAI Codex: remove the Codex CLI auth import path from onboarding and provider discovery so OpenClaw no longer copies `~/.codex` OAuth material into agent auth stores; use browser login or device pairing instead. (#70390) Thanks @pashpashpash. - Providers/OpenAI-compatible: mark known local backends such as vLLM, SGLang, llama.cpp, LM Studio, LocalAI, Jan, TabbyAPI, and text-generation-webui as streaming-usage compatible, so their token accounting no longer degrades to unknown/stale totals. (#68711) Thanks @gaineyllc. +- Providers/OpenAI-compatible: recover streamed token usage from llama.cpp-style `timings.prompt_n` / `timings.predicted_n` metadata and sanitize usage counts before accumulation, fixing unknown or stale totals when compatible servers do not emit an OpenAI-shaped `usage` object. (#41056) Thanks @xaeon2026. - OpenAI/Responses: use OpenAI's native `web_search` tool automatically for direct OpenAI Responses models when web search is enabled and no managed search provider is pinned; explicit providers such as Brave keep the managed `web_search` tool. - ACPX: add an explicit `openClawToolsMcpBridge` option that injects a core OpenClaw MCP server for selected built-in tools, starting with `cron`. - Agents/sessions: add mailbox-style `sessions_list` filters for label, agent, and search plus visibility-scoped derived title and last-message previews. (#69839) Thanks @dangoZhang. diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index 37151f0fbba..a79a2bff95c 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -68,6 +68,8 @@ function createMessageUpdateContext( emitReasoningStream: vi.fn(), flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(), resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(), + recordAssistantUsage: vi.fn(), + commitAssistantUsage: vi.fn(), } as unknown as EmbeddedPiSubscribeContext; } @@ -114,6 +116,7 @@ function createMessageEndContext( }, noteLastAssistant: vi.fn(), recordAssistantUsage: vi.fn(), + commitAssistantUsage: vi.fn(), log: { debug: vi.fn(), warn: vi.fn() }, stripBlockTags: (text: string) => text, finalizeAssistantTexts: params.finalizeAssistantTexts ?? vi.fn(), diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 11859f7d62d..a536a6b1fc5 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -389,6 +389,13 @@ export function handleMessageUpdate( : undefined; const evtType = typeof assistantRecord?.type === "string" ? assistantRecord.type : ""; + if (evtType === "text_end" || evtType === "done" || evtType === "error") { + ctx.recordAssistantUsage(assistantRecord); + if (evtType === "done" || evtType === "error") { + ctx.commitAssistantUsage(); + } + } + if (evtType === "thinking_start" || evtType === "thinking_delta" || evtType === "thinking_end") { if (evtType === "thinking_start" || evtType === "thinking_delta") { openReasoningStream(ctx); @@ -613,6 +620,7 @@ export function handleMessageEnd( const suppressDeterministicApprovalOutput = shouldSuppressDeterministicApprovalOutput(ctx.state); ctx.noteLastAssistant(assistantMessage); ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage); + ctx.commitAssistantUsage(); if (suppressVisibleAssistantOutput) { return; } diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index f541578ec23..95848ba061d 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -60,6 +60,8 @@ export type EmbeddedPiSubscribeState = { assistantTextBaseline: number; suppressBlockChunks: boolean; lastReasoningSent?: string; + pendingAssistantUsage?: NormalizedUsage; + assistantUsageCommitted: boolean; compactionInFlight: boolean; pendingCompactionRetry: number; @@ -133,6 +135,7 @@ export type EmbeddedPiSubscribeContext = { resolveCompactionRetry: () => void; maybeResolveCompactionWait: () => void; recordAssistantUsage: (usage: unknown) => void; + commitAssistantUsage: () => void; incrementCompactionCount: () => void; getUsageTotals: () => NormalizedUsage | undefined; getCompactionCount: () => number; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 58ef500b83f..74317cf0150 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -2,6 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; import { THINKING_TAG_CASES, + createSubscribedSessionHarness, createStubSessionHarness, emitAssistantLifecycleErrorAndEnd, emitMessageStartAndEndForAssistantText, @@ -10,6 +11,7 @@ import { findLifecycleErrorAgentEvent, } from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; +import { makeZeroUsageSnapshot } from "./usage.js"; describe("subscribeEmbeddedPiSession", () => { async function flushBlockReplyCallbacks(): Promise { @@ -109,6 +111,75 @@ describe("subscribeEmbeddedPiSession", () => { }); } + it("captures usage from completions timings on done events", () => { + const { emit, subscription } = createSubscribedSessionHarness({ runId: "run" }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "done", + timings: { + prompt_n: 30_834, + predicted_n: 34, + }, + }, + }); + emit({ + type: "message_end", + message: { + role: "assistant", + usage: makeZeroUsageSnapshot(), + }, + }); + + expect(subscription.getUsageTotals()).toEqual({ + input: 30_834, + output: 34, + cacheRead: undefined, + cacheWrite: undefined, + total: 30_868, + }); + }); + + it("does not double-count usage when done and message_end carry the same snapshot", () => { + const { emit, subscription } = createSubscribedSessionHarness({ runId: "run" }); + const usage = { + input: 100, + output: 20, + totalTokens: 120, + }; + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "done", + message: { + role: "assistant", + usage, + }, + }, + }); + emit({ + type: "message_end", + message: { + role: "assistant", + usage, + }, + }); + + expect(subscription.getUsageTotals()).toEqual({ + input: 100, + output: 20, + cacheRead: undefined, + cacheWrite: undefined, + total: 120, + }); + }); + it.each(THINKING_TAG_CASES)( "streams <%s> reasoning via onReasoningStream without leaking into final text", async ({ open, close }) => { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index c17376de978..e074dc15671 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -107,6 +107,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar assistantTextBaseline: 0, suppressBlockChunks: false, // Avoid late chunk inserts after final text merge. lastReasoningSent: undefined, + pendingAssistantUsage: undefined, + assistantUsageCommitted: false, compactionInFlight: false, pendingCompactionRetry: 0, compactionRetryResolve: undefined, @@ -212,6 +214,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.lastReasoningSent = undefined; state.reasoningStreamOpen = false; state.suppressBlockChunks = false; + state.pendingAssistantUsage = undefined; + state.assistantUsageCommitted = false; state.assistantMessageIndex += 1; state.lastAssistantStreamItemId = undefined; state.lastAssistantTextMessageIndex = -1; @@ -352,11 +356,42 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const maybeResolveCompactionWait = () => { resolveCompactionPromiseIfIdle(); }; - const recordAssistantUsage = (usageLike: unknown) => { - const usage = normalizeUsage((usageLike ?? undefined) as UsageLike | undefined); - if (!hasNonzeroUsage(usage)) { + const resolveAssistantUsage = (usageLike: unknown) => { + const candidates: unknown[] = [usageLike]; + if (usageLike && typeof usageLike === "object") { + const record = usageLike as Record; + const partial = + record.partial && typeof record.partial === "object" + ? (record.partial as Record) + : undefined; + const message = + record.message && typeof record.message === "object" + ? (record.message as Record) + : undefined; + candidates.push( + record.usage, + record.timings, + record.partial, + record.message, + partial?.usage, + partial?.timings, + message?.usage, + message?.timings, + ); + } + for (const candidate of candidates) { + const usage = normalizeUsage((candidate ?? undefined) as UsageLike | undefined); + if (hasNonzeroUsage(usage)) { + return usage; + } + } + return undefined; + }; + const commitAssistantUsage = () => { + if (state.assistantUsageCommitted || !state.pendingAssistantUsage) { return; } + const usage = state.pendingAssistantUsage; usageTotals.input += usage.input ?? 0; usageTotals.output += usage.output ?? 0; usageTotals.cacheRead += usage.cacheRead ?? 0; @@ -365,6 +400,17 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar usage.total ?? (usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); usageTotals.total += usageTotal; + state.assistantUsageCommitted = true; + }; + const recordAssistantUsage = (usageLike: unknown) => { + if (state.assistantUsageCommitted) { + return; + } + const usage = resolveAssistantUsage(usageLike); + if (!usage) { + return; + } + state.pendingAssistantUsage = usage; }; const getUsageTotals = () => { const hasUsage = @@ -759,6 +805,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar resolveCompactionRetry, maybeResolveCompactionWait, recordAssistantUsage, + commitAssistantUsage, incrementCompactionCount, getUsageTotals, getCompactionCount: () => compactionCount, diff --git a/src/agents/usage.normalization.test.ts b/src/agents/usage.normalization.test.ts index d3ebbe70daf..af0db15d3b4 100644 --- a/src/agents/usage.normalization.test.ts +++ b/src/agents/usage.normalization.test.ts @@ -34,6 +34,53 @@ describe("normalizeUsage", () => { }); }); + it("normalizes llama.cpp completion timings", () => { + const usage = normalizeUsage({ + timings: { + prompt_n: 30_834, + predicted_n: 34, + }, + }); + expect(usage).toEqual({ + input: 30_834, + output: 34, + cacheRead: undefined, + cacheWrite: undefined, + total: undefined, + }); + }); + + it("clamps negative and fractional usage counts to safe integers", () => { + const usage = normalizeUsage({ + input: -12.8, + output: 9.9, + cacheRead: -1, + cacheWrite: 3.2, + total: -99, + }); + expect(usage).toEqual({ + input: 0, + output: 9, + cacheRead: 0, + cacheWrite: 3, + total: 0, + }); + }); + + it("caps extremely large usage counts at Number.MAX_SAFE_INTEGER", () => { + const usage = normalizeUsage({ + input: 1e308, + output: Number.MAX_SAFE_INTEGER + 1000, + }); + expect(usage).toEqual({ + input: Number.MAX_SAFE_INTEGER, + output: Number.MAX_SAFE_INTEGER, + cacheRead: undefined, + cacheWrite: undefined, + total: undefined, + }); + }); + it("returns undefined for empty usage objects", () => { expect(normalizeUsage({})).toBeUndefined(); }); diff --git a/src/agents/usage.ts b/src/agents/usage.ts index dbe1e272a0a..4f437063585 100644 --- a/src/agents/usage.ts +++ b/src/agents/usage.ts @@ -28,6 +28,13 @@ export type UsageLike = { total_tokens?: number; cache_read?: number; cache_write?: number; + // llama.cpp-style streamed completion metadata. + prompt_n?: number; + predicted_n?: number; + timings?: { + prompt_n?: number; + predicted_n?: number; + }; }; export type NormalizedUsage = { @@ -79,12 +86,23 @@ export function hasNonzeroUsage(usage?: NormalizedUsage | null): usage is Normal ); } +const normalizeTokenCount = (value: unknown): number | undefined => { + const numeric = asFiniteNumber(value); + if (numeric === undefined) { + return undefined; + } + if (numeric <= 0) { + return 0; + } + return Math.min(Math.trunc(numeric), Number.MAX_SAFE_INTEGER); +}; + export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefined { if (!raw) { return undefined; } - const cacheRead = asFiniteNumber( + const cacheRead = normalizeTokenCount( raw.cacheRead ?? raw.cache_read ?? raw.cache_read_input_tokens ?? @@ -94,7 +112,13 @@ export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefi ); const rawInputValue = - raw.input ?? raw.inputTokens ?? raw.input_tokens ?? raw.promptTokens ?? raw.prompt_tokens; + raw.input ?? + raw.inputTokens ?? + raw.input_tokens ?? + raw.promptTokens ?? + raw.prompt_tokens ?? + raw.prompt_n ?? + raw.timings?.prompt_n; const usesOpenAIStylePromptTotals = raw.cached_tokens !== undefined || @@ -111,18 +135,20 @@ export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefi rawInput !== undefined && usesOpenAIStylePromptTotals && cacheRead !== undefined ? rawInput - cacheRead : rawInput; - const input = normalizedInput !== undefined && normalizedInput < 0 ? 0 : normalizedInput; - const output = asFiniteNumber( + const input = normalizeTokenCount(normalizedInput); + const output = normalizeTokenCount( raw.output ?? raw.outputTokens ?? raw.output_tokens ?? raw.completionTokens ?? - raw.completion_tokens, + raw.completion_tokens ?? + raw.predicted_n ?? + raw.timings?.predicted_n, ); - const cacheWrite = asFiniteNumber( + const cacheWrite = normalizeTokenCount( raw.cacheWrite ?? raw.cache_write ?? raw.cache_creation_input_tokens, ); - const total = asFiniteNumber(raw.total ?? raw.totalTokens ?? raw.total_tokens); + const total = normalizeTokenCount(raw.total ?? raw.totalTokens ?? raw.total_tokens); if ( input === undefined &&