fix(agents): recover streamed timings usage (#41056) (thanks @xaeon2026)

This commit is contained in:
Peter Steinberger
2026-04-23 04:16:57 +01:00
parent 137a3629cc
commit 2908190ba2
8 changed files with 216 additions and 10 deletions

View File

@@ -9,6 +9,7 @@ Docs: https://docs.openclaw.ai
- Providers/Amazon Bedrock Mantle: add Claude Opus 4.7 through Mantle's Anthropic Messages route with provider-owned bearer-auth streaming, so the model is actually callable without treating AWS bearer tokens like Anthropic API keys. Thanks @wirjo.
- Providers/OpenAI Codex: remove the Codex CLI auth import path from onboarding and provider discovery so OpenClaw no longer copies `~/.codex` OAuth material into agent auth stores; use browser login or device pairing instead. (#70390) Thanks @pashpashpash.
- Providers/OpenAI-compatible: mark known local backends such as vLLM, SGLang, llama.cpp, LM Studio, LocalAI, Jan, TabbyAPI, and text-generation-webui as streaming-usage compatible, so their token accounting no longer degrades to unknown/stale totals. (#68711) Thanks @gaineyllc.
- Providers/OpenAI-compatible: recover streamed token usage from llama.cpp-style `timings.prompt_n` / `timings.predicted_n` metadata and sanitize usage counts before accumulation, fixing unknown or stale totals when compatible servers do not emit an OpenAI-shaped `usage` object. (#41056) Thanks @xaeon2026.
- OpenAI/Responses: use OpenAI's native `web_search` tool automatically for direct OpenAI Responses models when web search is enabled and no managed search provider is pinned; explicit providers such as Brave keep the managed `web_search` tool.
- ACPX: add an explicit `openClawToolsMcpBridge` option that injects a core OpenClaw MCP server for selected built-in tools, starting with `cron`.
- Agents/sessions: add mailbox-style `sessions_list` filters for label, agent, and search plus visibility-scoped derived title and last-message previews. (#69839) Thanks @dangoZhang.

View File

@@ -68,6 +68,8 @@ function createMessageUpdateContext(
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(),
resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(),
recordAssistantUsage: vi.fn(),
commitAssistantUsage: vi.fn(),
} as unknown as EmbeddedPiSubscribeContext;
}
@@ -114,6 +116,7 @@ function createMessageEndContext(
},
noteLastAssistant: vi.fn(),
recordAssistantUsage: vi.fn(),
commitAssistantUsage: vi.fn(),
log: { debug: vi.fn(), warn: vi.fn() },
stripBlockTags: (text: string) => text,
finalizeAssistantTexts: params.finalizeAssistantTexts ?? vi.fn(),

View File

@@ -389,6 +389,13 @@ export function handleMessageUpdate(
: undefined;
const evtType = typeof assistantRecord?.type === "string" ? assistantRecord.type : "";
if (evtType === "text_end" || evtType === "done" || evtType === "error") {
ctx.recordAssistantUsage(assistantRecord);
if (evtType === "done" || evtType === "error") {
ctx.commitAssistantUsage();
}
}
if (evtType === "thinking_start" || evtType === "thinking_delta" || evtType === "thinking_end") {
if (evtType === "thinking_start" || evtType === "thinking_delta") {
openReasoningStream(ctx);
@@ -613,6 +620,7 @@ export function handleMessageEnd(
const suppressDeterministicApprovalOutput = shouldSuppressDeterministicApprovalOutput(ctx.state);
ctx.noteLastAssistant(assistantMessage);
ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage);
ctx.commitAssistantUsage();
if (suppressVisibleAssistantOutput) {
return;
}

View File

@@ -60,6 +60,8 @@ export type EmbeddedPiSubscribeState = {
assistantTextBaseline: number;
suppressBlockChunks: boolean;
lastReasoningSent?: string;
pendingAssistantUsage?: NormalizedUsage;
assistantUsageCommitted: boolean;
compactionInFlight: boolean;
pendingCompactionRetry: number;
@@ -133,6 +135,7 @@ export type EmbeddedPiSubscribeContext = {
resolveCompactionRetry: () => void;
maybeResolveCompactionWait: () => void;
recordAssistantUsage: (usage: unknown) => void;
commitAssistantUsage: () => void;
incrementCompactionCount: () => void;
getUsageTotals: () => NormalizedUsage | undefined;
getCompactionCount: () => number;

View File

@@ -2,6 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import {
THINKING_TAG_CASES,
createSubscribedSessionHarness,
createStubSessionHarness,
emitAssistantLifecycleErrorAndEnd,
emitMessageStartAndEndForAssistantText,
@@ -10,6 +11,7 @@ import {
findLifecycleErrorAgentEvent,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
import { makeZeroUsageSnapshot } from "./usage.js";
describe("subscribeEmbeddedPiSession", () => {
async function flushBlockReplyCallbacks(): Promise<void> {
@@ -109,6 +111,75 @@ describe("subscribeEmbeddedPiSession", () => {
});
}
it("captures usage from completions timings on done events", () => {
const { emit, subscription } = createSubscribedSessionHarness({ runId: "run" });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "done",
timings: {
prompt_n: 30_834,
predicted_n: 34,
},
},
});
emit({
type: "message_end",
message: {
role: "assistant",
usage: makeZeroUsageSnapshot(),
},
});
expect(subscription.getUsageTotals()).toEqual({
input: 30_834,
output: 34,
cacheRead: undefined,
cacheWrite: undefined,
total: 30_868,
});
});
it("does not double-count usage when done and message_end carry the same snapshot", () => {
const { emit, subscription } = createSubscribedSessionHarness({ runId: "run" });
const usage = {
input: 100,
output: 20,
totalTokens: 120,
};
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "done",
message: {
role: "assistant",
usage,
},
},
});
emit({
type: "message_end",
message: {
role: "assistant",
usage,
},
});
expect(subscription.getUsageTotals()).toEqual({
input: 100,
output: 20,
cacheRead: undefined,
cacheWrite: undefined,
total: 120,
});
});
it.each(THINKING_TAG_CASES)(
"streams <%s> reasoning via onReasoningStream without leaking into final text",
async ({ open, close }) => {

View File

@@ -107,6 +107,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
assistantTextBaseline: 0,
suppressBlockChunks: false, // Avoid late chunk inserts after final text merge.
lastReasoningSent: undefined,
pendingAssistantUsage: undefined,
assistantUsageCommitted: false,
compactionInFlight: false,
pendingCompactionRetry: 0,
compactionRetryResolve: undefined,
@@ -212,6 +214,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.lastReasoningSent = undefined;
state.reasoningStreamOpen = false;
state.suppressBlockChunks = false;
state.pendingAssistantUsage = undefined;
state.assistantUsageCommitted = false;
state.assistantMessageIndex += 1;
state.lastAssistantStreamItemId = undefined;
state.lastAssistantTextMessageIndex = -1;
@@ -352,11 +356,42 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const maybeResolveCompactionWait = () => {
resolveCompactionPromiseIfIdle();
};
const recordAssistantUsage = (usageLike: unknown) => {
const usage = normalizeUsage((usageLike ?? undefined) as UsageLike | undefined);
if (!hasNonzeroUsage(usage)) {
const resolveAssistantUsage = (usageLike: unknown) => {
const candidates: unknown[] = [usageLike];
if (usageLike && typeof usageLike === "object") {
const record = usageLike as Record<string, unknown>;
const partial =
record.partial && typeof record.partial === "object"
? (record.partial as Record<string, unknown>)
: undefined;
const message =
record.message && typeof record.message === "object"
? (record.message as Record<string, unknown>)
: undefined;
candidates.push(
record.usage,
record.timings,
record.partial,
record.message,
partial?.usage,
partial?.timings,
message?.usage,
message?.timings,
);
}
for (const candidate of candidates) {
const usage = normalizeUsage((candidate ?? undefined) as UsageLike | undefined);
if (hasNonzeroUsage(usage)) {
return usage;
}
}
return undefined;
};
const commitAssistantUsage = () => {
if (state.assistantUsageCommitted || !state.pendingAssistantUsage) {
return;
}
const usage = state.pendingAssistantUsage;
usageTotals.input += usage.input ?? 0;
usageTotals.output += usage.output ?? 0;
usageTotals.cacheRead += usage.cacheRead ?? 0;
@@ -365,6 +400,17 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
usage.total ??
(usage.input ?? 0) + (usage.output ?? 0) + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
usageTotals.total += usageTotal;
state.assistantUsageCommitted = true;
};
const recordAssistantUsage = (usageLike: unknown) => {
if (state.assistantUsageCommitted) {
return;
}
const usage = resolveAssistantUsage(usageLike);
if (!usage) {
return;
}
state.pendingAssistantUsage = usage;
};
const getUsageTotals = () => {
const hasUsage =
@@ -759,6 +805,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
resolveCompactionRetry,
maybeResolveCompactionWait,
recordAssistantUsage,
commitAssistantUsage,
incrementCompactionCount,
getUsageTotals,
getCompactionCount: () => compactionCount,

View File

@@ -34,6 +34,53 @@ describe("normalizeUsage", () => {
});
});
it("normalizes llama.cpp completion timings", () => {
const usage = normalizeUsage({
timings: {
prompt_n: 30_834,
predicted_n: 34,
},
});
expect(usage).toEqual({
input: 30_834,
output: 34,
cacheRead: undefined,
cacheWrite: undefined,
total: undefined,
});
});
it("clamps negative and fractional usage counts to safe integers", () => {
const usage = normalizeUsage({
input: -12.8,
output: 9.9,
cacheRead: -1,
cacheWrite: 3.2,
total: -99,
});
expect(usage).toEqual({
input: 0,
output: 9,
cacheRead: 0,
cacheWrite: 3,
total: 0,
});
});
it("caps extremely large usage counts at Number.MAX_SAFE_INTEGER", () => {
const usage = normalizeUsage({
input: 1e308,
output: Number.MAX_SAFE_INTEGER + 1000,
});
expect(usage).toEqual({
input: Number.MAX_SAFE_INTEGER,
output: Number.MAX_SAFE_INTEGER,
cacheRead: undefined,
cacheWrite: undefined,
total: undefined,
});
});
it("returns undefined for empty usage objects", () => {
expect(normalizeUsage({})).toBeUndefined();
});

View File

@@ -28,6 +28,13 @@ export type UsageLike = {
total_tokens?: number;
cache_read?: number;
cache_write?: number;
// llama.cpp-style streamed completion metadata.
prompt_n?: number;
predicted_n?: number;
timings?: {
prompt_n?: number;
predicted_n?: number;
};
};
export type NormalizedUsage = {
@@ -79,12 +86,23 @@ export function hasNonzeroUsage(usage?: NormalizedUsage | null): usage is Normal
);
}
const normalizeTokenCount = (value: unknown): number | undefined => {
const numeric = asFiniteNumber(value);
if (numeric === undefined) {
return undefined;
}
if (numeric <= 0) {
return 0;
}
return Math.min(Math.trunc(numeric), Number.MAX_SAFE_INTEGER);
};
export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefined {
if (!raw) {
return undefined;
}
const cacheRead = asFiniteNumber(
const cacheRead = normalizeTokenCount(
raw.cacheRead ??
raw.cache_read ??
raw.cache_read_input_tokens ??
@@ -94,7 +112,13 @@ export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefi
);
const rawInputValue =
raw.input ?? raw.inputTokens ?? raw.input_tokens ?? raw.promptTokens ?? raw.prompt_tokens;
raw.input ??
raw.inputTokens ??
raw.input_tokens ??
raw.promptTokens ??
raw.prompt_tokens ??
raw.prompt_n ??
raw.timings?.prompt_n;
const usesOpenAIStylePromptTotals =
raw.cached_tokens !== undefined ||
@@ -111,18 +135,20 @@ export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefi
rawInput !== undefined && usesOpenAIStylePromptTotals && cacheRead !== undefined
? rawInput - cacheRead
: rawInput;
const input = normalizedInput !== undefined && normalizedInput < 0 ? 0 : normalizedInput;
const output = asFiniteNumber(
const input = normalizeTokenCount(normalizedInput);
const output = normalizeTokenCount(
raw.output ??
raw.outputTokens ??
raw.output_tokens ??
raw.completionTokens ??
raw.completion_tokens,
raw.completion_tokens ??
raw.predicted_n ??
raw.timings?.predicted_n,
);
const cacheWrite = asFiniteNumber(
const cacheWrite = normalizeTokenCount(
raw.cacheWrite ?? raw.cache_write ?? raw.cache_creation_input_tokens,
);
const total = asFiniteNumber(raw.total ?? raw.totalTokens ?? raw.total_tokens);
const total = normalizeTokenCount(raw.total ?? raw.totalTokens ?? raw.total_tokens);
if (
input === undefined &&