diff --git a/CHANGELOG.md b/CHANGELOG.md index 7862b58dd1e..05e7842b191 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -111,6 +111,7 @@ Docs: https://docs.openclaw.ai - Daemon/gateway: prevent systemd restart storms on configuration errors by exiting with `EX_CONFIG` and adding generated unit restart-prevention guards. (#63913) Thanks @neo1027144-creator. - Agents/exec: prevent gateway crash ("Agent listener invoked outside active run") when a subagent exec tool produces stdout/stderr after the agent run has ended or been aborted. (#62821) Thanks @openperf. - Browser/tabs: route `/tabs/action` close/select through the same browser endpoint reachability and policy checks as list/new (including Playwright-backed remote tab operations), reject CDP HTTP redirects on probe requests, and sanitize blocked-endpoint error responses so tab list/focus/close flows fail closed without echoing raw policy details back to callers. (#63332) +- Gateway/OpenAI compat: return real `usage` for non-stream `/v1/chat/completions` responses, emit the final usage chunk when `stream_options.include_usage=true`, and bound usage-gated stream finalization after lifecycle end. (#62986) Thanks @Lellansin. ## 2026.4.9 diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts index 451cec36526..13416df0eb0 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts @@ -224,4 +224,63 @@ describe("handleAgentEnd", () => { resolveChannelFlush?.(); await endPromise; }); + + it("emits lifecycle end after async channel flush completes", async () => { + let resolveChannelFlush: (() => void) | undefined; + const onAgentEvent = vi.fn(); + const onBlockReplyFlush = vi.fn( + () => + new Promise((resolve) => { + resolveChannelFlush = resolve; + }), + ); + const ctx = createContext(undefined, { onAgentEvent, onBlockReplyFlush }); + + const endPromise = handleAgentEnd(ctx); + + expect(onAgentEvent).not.toHaveBeenCalled(); + + resolveChannelFlush?.(); + await endPromise; + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { phase: "end" }, + }); + }); + + it("emits lifecycle error after async channel flush completes", async () => { + let resolveChannelFlush: (() => void) | undefined; + const onAgentEvent = vi.fn(); + const onBlockReplyFlush = vi.fn( + () => + new Promise((resolve) => { + resolveChannelFlush = resolve; + }), + ); + const ctx = createContext( + { + role: "assistant", + stopReason: "error", + errorMessage: "connection refused", + content: [{ type: "text", text: "" }], + }, + { onAgentEvent, onBlockReplyFlush }, + ); + + const endPromise = handleAgentEnd(ctx); + + expect(onAgentEvent).not.toHaveBeenCalled(); + + resolveChannelFlush?.(); + await endPromise; + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "lifecycle", + data: { + phase: "error", + error: "LLM request failed: connection refused by the provider endpoint.", + }, + }); + }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 3b573c24fc6..3911466219f 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -38,6 +38,7 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) { export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { const lastAssistant = ctx.state.lastAssistant; const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error"; + let lifecycleErrorText: string | undefined; if (isError && lastAssistant) { const friendlyError = formatAssistantErrorText(lastAssistant, { @@ -54,6 +55,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { const observedError = buildApiErrorObservationFields(rawError); const safeErrorText = buildTextObservationFields(errorText).textPreview ?? "LLM request failed."; + lifecycleErrorText = safeErrorText; const safeRunId = sanitizeForConsole(ctx.params.runId) ?? "-"; const safeModel = sanitizeForConsole(lastAssistant.model) ?? "unknown"; const safeProvider = sanitizeForConsole(lastAssistant.provider) ?? "unknown"; @@ -71,24 +73,30 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { ...observedError, consoleMessage: `embedded run agent end: runId=${safeRunId} isError=true model=${safeModel} provider=${safeProvider} error=${safeErrorText}${rawErrorConsoleSuffix}`, }); - emitAgentEvent({ - runId: ctx.params.runId, - stream: "lifecycle", - data: { - phase: "error", - error: safeErrorText, - endedAt: Date.now(), - }, - }); - void ctx.params.onAgentEvent?.({ - stream: "lifecycle", - data: { - phase: "error", - error: safeErrorText, - }, - }); } else { ctx.log.debug(`embedded run agent end: runId=${ctx.params.runId} isError=${isError}`); + } + + const emitLifecycleTerminal = () => { + if (isError) { + emitAgentEvent({ + runId: ctx.params.runId, + stream: "lifecycle", + data: { + phase: "error", + error: lifecycleErrorText ?? "LLM request failed.", + endedAt: Date.now(), + }, + }); + void ctx.params.onAgentEvent?.({ + stream: "lifecycle", + data: { + phase: "error", + error: lifecycleErrorText ?? "LLM request failed.", + }, + }); + return; + } emitAgentEvent({ runId: ctx.params.runId, stream: "lifecycle", @@ -101,7 +109,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { stream: "lifecycle", data: { phase: "end" }, }); - } + }; const finalizeAgentEnd = () => { ctx.state.blockState.thinking = false; @@ -140,11 +148,14 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer(); finalizeAgentEnd(); if (isPromiseLike(flushBlockReplyBufferResult)) { - return flushBlockReplyBufferResult.then(() => flushPendingMediaAndChannel()); + return flushBlockReplyBufferResult + .then(() => flushPendingMediaAndChannel()) + .then(() => emitLifecycleTerminal()); } const flushPendingMediaAndChannelResult = flushPendingMediaAndChannel(); if (isPromiseLike(flushPendingMediaAndChannelResult)) { - return flushPendingMediaAndChannelResult; + return flushPendingMediaAndChannelResult.then(() => emitLifecycleTerminal()); } + emitLifecycleTerminal(); } diff --git a/src/agents/usage.test.ts b/src/agents/usage.test.ts index 3ac11a7bd93..68f69113b67 100644 --- a/src/agents/usage.test.ts +++ b/src/agents/usage.test.ts @@ -4,6 +4,7 @@ import { hasNonzeroUsage, derivePromptTokens, deriveSessionTotalTokens, + toOpenAiChatCompletionsUsage, } from "./usage.js"; describe("normalizeUsage", () => { @@ -146,6 +147,90 @@ describe("normalizeUsage", () => { }); }); +describe("toOpenAiChatCompletionsUsage", () => { + it("uses max(component sum, aggregate total) when breakdown is partial", () => { + const usage = normalizeUsage({ output_tokens: 20, total_tokens: 100 }); + expect(toOpenAiChatCompletionsUsage(usage)).toEqual({ + prompt_tokens: 0, + completion_tokens: 20, + total_tokens: 100, + }); + }); + + it("uses component sum when it exceeds aggregate total", () => { + expect( + toOpenAiChatCompletionsUsage({ + input: 30, + output: 40, + total: 50, + }), + ).toEqual({ + prompt_tokens: 30, + completion_tokens: 40, + total_tokens: 70, + }); + }); + + it("uses aggregate total when only total is present", () => { + const usage = normalizeUsage({ total_tokens: 42 }); + expect(toOpenAiChatCompletionsUsage(usage)).toEqual({ + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 42, + }); + }); + + it("returns zeros for undefined usage", () => { + expect(toOpenAiChatCompletionsUsage(undefined)).toEqual({ + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 0, + }); + }); + + it("raises total_tokens with aggregate when cache write is excluded from prompt sum", () => { + expect( + toOpenAiChatCompletionsUsage({ + input: 10, + output: 5, + cacheWrite: 100, + total: 200, + }), + ).toEqual({ + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 200, + }); + }); + + it("clamps negative completion before deriving total_tokens", () => { + expect( + toOpenAiChatCompletionsUsage({ + input: 3, + output: -5, + }), + ).toEqual({ + prompt_tokens: 3, + completion_tokens: 0, + total_tokens: 3, + }); + }); + + it("preserves aggregate total when components are partially negative", () => { + expect( + toOpenAiChatCompletionsUsage({ + input: 3, + output: -5, + total: 7, + }), + ).toEqual({ + prompt_tokens: 3, + completion_tokens: 0, + total_tokens: 7, + }); + }); +}); + describe("hasNonzeroUsage", () => { it("returns true when cache read is nonzero", () => { const usage = { cacheRead: 100 }; diff --git a/src/agents/usage.ts b/src/agents/usage.ts index d353f144be1..dbe1e272a0a 100644 --- a/src/agents/usage.ts +++ b/src/agents/usage.ts @@ -143,6 +143,41 @@ export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefi }; } +/** + * Maps normalized usage to OpenAI Chat Completions `usage` fields. + * + * `prompt_tokens` is input + cacheRead (cache write is excluded to match the + * OpenAI-style breakdown used by the compat endpoint). + * + * `total_tokens` is the greater of the component sum and aggregate `total` when + * present, so a partial breakdown cannot discard a valid upstream total. + */ +export function toOpenAiChatCompletionsUsage(usage: NormalizedUsage | undefined): { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; +} { + const input = usage?.input ?? 0; + const output = usage?.output ?? 0; + const cacheRead = usage?.cacheRead ?? 0; + const promptTokens = Math.max(0, input + cacheRead); + const completionTokens = Math.max(0, output); + const componentTotal = promptTokens + completionTokens; + const aggregateRaw = usage?.total; + const aggregateTotal = + typeof aggregateRaw === "number" && Number.isFinite(aggregateRaw) + ? Math.max(0, aggregateRaw) + : undefined; + const totalTokens = + aggregateTotal !== undefined ? Math.max(componentTotal, aggregateTotal) : componentTotal; + + return { + prompt_tokens: promptTokens, + completion_tokens: completionTokens, + total_tokens: totalTokens, + }; +} + export function derivePromptTokens(usage?: { input?: number; cacheRead?: number; diff --git a/src/gateway/openai-http.test.ts b/src/gateway/openai-http.test.ts index 85704e56c0b..2a1442e7e89 100644 --- a/src/gateway/openai-http.test.ts +++ b/src/gateway/openai-http.test.ts @@ -700,6 +700,159 @@ describe("OpenAI-compatible HTTP API (e2e)", () => { expect(msg.content).toBe("hello"); } + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage basic" }], + meta: { + agentMeta: { + usage: { + input: 42, + output: 17, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 42, + completion_tokens: 17, + total_tokens: 59, + }); + } + + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage cache" }], + meta: { + agentMeta: { + usage: { + input: 10, + output: 5, + cacheRead: 20, + cacheWrite: 3, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 30, + completion_tokens: 5, + total_tokens: 35, + }); + } + + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage total" }], + meta: { + agentMeta: { + usage: { + input: 10, + output: 5, + total: 100, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 100, + }); + } + + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage total only" }], + meta: { + agentMeta: { + usage: { + total: 123, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 123, + }); + } + + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage non-finite" }], + meta: { + agentMeta: { + usage: { + input: Number.POSITIVE_INFINITY, + output: Number.NaN, + cacheRead: 2, + cacheWrite: Number.POSITIVE_INFINITY, + total: Number.NaN, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 2, + completion_tokens: 0, + total_tokens: 2, + }); + } + + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage non-finite aggregate fallback" }], + meta: { + agentMeta: { + usage: { + input: Number.POSITIVE_INFINITY, + output: Number.NaN, + total: 123, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 123, + }); + } + + { + agentCommand.mockClear(); + agentCommand.mockResolvedValueOnce({ + payloads: [{ text: "usage cache-write only" }], + meta: { + agentMeta: { + usage: { + cacheWrite: 10, + total: 10, + }, + }, + }, + } as never); + const json = await postSyncUserMessage("usage"); + expect(json.usage).toEqual({ + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 10, + }); + } + { agentCommand.mockClear(); agentCommand.mockResolvedValueOnce({ payloads: [{ text: "" }] } as never); @@ -802,6 +955,8 @@ describe("OpenAI-compatible HTTP API (e2e)", () => { .filter((v): v is string => typeof v === "string") .join(""); expect(allContent).toBe("hello"); + const usageChunks = jsonChunks.filter((c) => "usage" in c); + expect(usageChunks).toHaveLength(0); } { @@ -879,6 +1034,225 @@ describe("OpenAI-compatible HTTP API (e2e)", () => { } }); + it("includes usage in final stream chunk when stream_options.include_usage=true", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockImplementationOnce((async (opts: unknown) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "he" } }); + emitAgentEvent({ runId, stream: "assistant", data: { delta: "llo" } }); + return { + payloads: [{ text: "hello" }], + meta: { + agentMeta: { + usage: { + input: 12, + output: 5, + cacheRead: 3, + cacheWrite: 0, + total: 20, + }, + }, + }, + }; + }) as never); + + const res = await postChatCompletions(port, { + stream: true, + stream_options: { include_usage: true }, + model: "openclaw", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + + const text = await res.text(); + const data = parseSseDataLines(text); + expect(data[data.length - 1]).toBe("[DONE]"); + const jsonChunks = data + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + + const usageChunk = jsonChunks.find((chunk) => "usage" in chunk); + expect(usageChunk?.usage).toEqual({ + prompt_tokens: 15, + completion_tokens: 5, + total_tokens: 20, + }); + expect(usageChunk?.choices).toEqual([]); + }); + + it("keeps aggregate-only usage total in final stream usage chunk", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockImplementationOnce((async (opts: unknown) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } }); + return { + payloads: [{ text: "hello" }], + meta: { + agentMeta: { + usage: { + total: 123, + }, + }, + }, + }; + }) as never); + + const res = await postChatCompletions(port, { + stream: true, + stream_options: { include_usage: true }, + model: "openclaw", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + + const text = await res.text(); + const data = parseSseDataLines(text); + expect(data[data.length - 1]).toBe("[DONE]"); + const jsonChunks = data + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + const usageChunk = jsonChunks.find((chunk) => "usage" in chunk); + expect(usageChunk?.usage).toEqual({ + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 123, + }); + }); + + it("finalizes stream when lifecycle end arrives before usage is available", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockImplementationOnce( + ((opts: unknown) => + new Promise((resolve) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } }); + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end" } }); + setTimeout(() => { + resolve({ + payloads: [{ text: "hello" }], + meta: { + agentMeta: { + usage: { input: 7, output: 3, total: 10 }, + }, + }, + }); + }, 100); + })) as never, + ); + + const res = await postChatCompletions(port, { + stream: true, + stream_options: { include_usage: true }, + model: "openclaw", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + + const text = await res.text(); + const data = parseSseDataLines(text); + expect(data[data.length - 1]).toBe("[DONE]"); + const jsonChunks = data + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + const usageChunk = jsonChunks.find((chunk) => "usage" in chunk); + expect(usageChunk?.usage).toEqual({ + prompt_tokens: 7, + completion_tokens: 3, + total_tokens: 10, + }); + }); + + it( + "cleans up usage-enabled stream when client disconnects before usage arrives", + { timeout: 15_000 }, + async () => { + const port = enabledPort; + let serverAbortSignal: AbortSignal | undefined; + + agentCommand.mockClear(); + agentCommand.mockImplementationOnce( + (opts: unknown) => + new Promise((resolve) => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + const signal = (opts as { abortSignal?: AbortSignal } | undefined)?.abortSignal; + serverAbortSignal = signal; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } }); + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end" } }); + if (signal?.aborted) { + resolve(undefined); + return; + } + signal?.addEventListener("abort", () => resolve(undefined), { once: true }); + }), + ); + + const clientReq = http.request({ + hostname: "127.0.0.1", + port, + path: "/v1/chat/completions", + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer secret", + }, + }); + clientReq.on("error", () => {}); + clientReq.end( + JSON.stringify({ + stream: true, + stream_options: { include_usage: true }, + model: "openclaw", + messages: [{ role: "user", content: "hi" }], + }), + ); + + await vi.waitFor(() => { + expect(agentCommand).toHaveBeenCalledTimes(1); + }); + + clientReq.destroy(); + + await vi.waitFor( + () => { + expect(serverAbortSignal?.aborted).toBe(true); + }, + { timeout: 5_000, interval: 50 }, + ); + }, + ); + + it("does not block stream finalization on usage when include_usage is not requested", async () => { + const port = enabledPort; + agentCommand.mockClear(); + agentCommand.mockImplementationOnce( + ((opts: unknown) => + new Promise(() => { + const runId = (opts as { runId?: string } | undefined)?.runId ?? ""; + emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } }); + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end" } }); + })) as never, + ); + + const res = await postChatCompletions(port, { + stream: true, + model: "openclaw", + messages: [{ role: "user", content: "hi" }], + }); + expect(res.status).toBe(200); + + const text = await res.text(); + const data = parseSseDataLines(text); + expect(data[data.length - 1]).toBe("[DONE]"); + const jsonChunks = data + .filter((d) => d !== "[DONE]") + .map((d) => JSON.parse(d) as Record); + const usageChunks = jsonChunks.filter((chunk) => "usage" in chunk); + expect(usageChunks).toHaveLength(0); + }); + it("treats shared-secret bearer callers as owner operators", async () => { const port = await getFreePort(); const server = await startTokenServer(port); diff --git a/src/gateway/openai-http.ts b/src/gateway/openai-http.ts index f3adad693b5..bc9a9e7bbff 100644 --- a/src/gateway/openai-http.ts +++ b/src/gateway/openai-http.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { ImageContent } from "../agents/command/types.js"; +import { normalizeUsage, toOpenAiChatCompletionsUsage } from "../agents/usage.js"; import { createDefaultDeps } from "../cli/deps.js"; import { agentCommandFromIngress } from "../commands/agent.js"; import type { GatewayHttpChatCompletionsConfig } from "../config/types.gateway.js"; @@ -57,6 +58,8 @@ type OpenAiChatMessage = { type OpenAiChatCompletionRequest = { model?: unknown; stream?: unknown; + // Naming/style reference: src/agents/openai-transport-stream.ts:1262-1273 + stream_options?: unknown; messages?: unknown; user?: unknown; }; @@ -163,6 +166,40 @@ function writeAssistantContentChunk( }); } +function writeAssistantStopChunk(res: ServerResponse, params: { runId: string; model: string }) { + writeSse(res, { + id: params.runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model: params.model, + choices: [ + { + index: 0, + delta: {}, + finish_reason: "stop", + }, + ], + }); +} + +function writeUsageChunk( + res: ServerResponse, + params: { + runId: string; + model: string; + usage: { prompt_tokens: number; completion_tokens: number; total_tokens: number }; + }, +) { + writeSse(res, { + id: params.runId, + object: "chat.completion.chunk", + created: Math.floor(Date.now() / 1000), + model: params.model, + choices: [], + usage: params.usage, + }); +} + function asMessages(val: unknown): OpenAiChatMessage[] { return Array.isArray(val) ? (val as OpenAiChatMessage[]) : []; } @@ -421,6 +458,44 @@ function resolveAgentResponseText(result: unknown): string { return content || "No response from OpenClaw."; } +type AgentUsageMeta = { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; +}; + +function resolveRawAgentUsage(result: unknown): AgentUsageMeta | undefined { + return ( + result as { + meta?: { + agentMeta?: { + usage?: AgentUsageMeta; + }; + }; + } | null + )?.meta?.agentMeta?.usage; +} + +function resolveChatCompletionUsage(result: unknown): { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; +} { + return toOpenAiChatCompletionsUsage(normalizeUsage(resolveRawAgentUsage(result))); +} + +function resolveIncludeUsageForStreaming(payload: OpenAiChatCompletionRequest): boolean { + // Keep parsing aligned with OpenAI wire-format field names. + // Flow reference: src/agents/openai-transport-stream.ts:1262-1273 + const streamOptions = payload.stream_options; + if (!streamOptions || typeof streamOptions !== "object" || Array.isArray(streamOptions)) { + return false; + } + return (streamOptions as { include_usage?: unknown }).include_usage === true; +} + export async function handleOpenAiHttpRequest( req: IncomingMessage, res: ServerResponse, @@ -451,6 +526,7 @@ export async function handleOpenAiHttpRequest( const payload = coerceRequest(handled.body); const stream = Boolean(payload.stream); + const streamIncludeUsage = stream && resolveIncludeUsageForStreaming(payload); const model = typeof payload.model === "string" ? payload.model : "openclaw"; const user = typeof payload.user === "string" ? payload.user : undefined; @@ -526,6 +602,7 @@ export async function handleOpenAiHttpRequest( } const content = resolveAgentResponseText(result); + const usage = resolveChatCompletionUsage(result); sendJson(res, 200, { id: runId, @@ -539,7 +616,7 @@ export async function handleOpenAiHttpRequest( finish_reason: "stop", }, ], - usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }, + usage, }); } catch (err) { if (abortController.signal.aborted) { @@ -558,10 +635,45 @@ export async function handleOpenAiHttpRequest( setSseHeaders(res); let wroteRole = false; + let wroteStopChunk = false; let sawAssistantDelta = false; + let finalUsage: + | { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; + } + | undefined; + let finalizeRequested = false; let closed = false; let stopWatchingDisconnect = () => {}; + const maybeFinalize = () => { + if (closed || !finalizeRequested) { + return; + } + if (streamIncludeUsage && !finalUsage) { + return; + } + closed = true; + stopWatchingDisconnect(); + unsubscribe(); + if (!wroteStopChunk) { + writeAssistantStopChunk(res, { runId, model }); + wroteStopChunk = true; + } + if (streamIncludeUsage && finalUsage) { + writeUsageChunk(res, { runId, model, usage: finalUsage }); + } + writeDone(res); + res.end(); + }; + + const requestFinalize = () => { + finalizeRequested = true; + maybeFinalize(); + }; + const unsubscribe = onAgentEvent((evt) => { if (evt.runId !== runId) { return; @@ -594,11 +706,7 @@ export async function handleOpenAiHttpRequest( if (evt.stream === "lifecycle") { const phase = evt.data?.phase; if (phase === "end" || phase === "error") { - closed = true; - stopWatchingDisconnect(); - unsubscribe(); - writeDone(res); - res.end(); + requestFinalize(); } } }); @@ -616,6 +724,8 @@ export async function handleOpenAiHttpRequest( return; } + finalUsage = resolveChatCompletionUsage(result); + if (!sawAssistantDelta) { if (!wroteRole) { wroteRole = true; @@ -632,6 +742,7 @@ export async function handleOpenAiHttpRequest( finishReason: null, }); } + requestFinalize(); } catch (err) { if (closed || abortController.signal.aborted) { return; @@ -643,18 +754,25 @@ export async function handleOpenAiHttpRequest( content: "Error: internal error", finishReason: "stop", }); + wroteStopChunk = true; + finalUsage = { + prompt_tokens: 0, + completion_tokens: 0, + total_tokens: 0, + }; emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "error" }, }); + requestFinalize(); } finally { if (!closed) { - closed = true; - stopWatchingDisconnect(); - unsubscribe(); - writeDone(res); - res.end(); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "end" }, + }); } } })();