fix: return real usage for OpenAI-compatible chat completions (#62986) (thanks @Lellansin)

* Gateway: fix chat completions usage compatibility

* Gateway: clarify usage-gated stream wait

* Gateway: preserve aggregate usage totals

* Agents: clamp usage components before total

* fix(gateway): bound usage stream finalization

* fix: add OpenAI compat usage changelog (#62986) (thanks @Lellansin)

* fix(agents): emit lifecycle terminal events after flush

---------

Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
Lellansin Huang
2026-04-11 00:16:24 +08:00
committed by GitHub
parent f64c84ab6b
commit 2ccd1839f2
7 changed files with 713 additions and 30 deletions

View File

@@ -111,6 +111,7 @@ Docs: https://docs.openclaw.ai
- Daemon/gateway: prevent systemd restart storms on configuration errors by exiting with `EX_CONFIG` and adding generated unit restart-prevention guards. (#63913) Thanks @neo1027144-creator.
- Agents/exec: prevent gateway crash ("Agent listener invoked outside active run") when a subagent exec tool produces stdout/stderr after the agent run has ended or been aborted. (#62821) Thanks @openperf.
- Browser/tabs: route `/tabs/action` close/select through the same browser endpoint reachability and policy checks as list/new (including Playwright-backed remote tab operations), reject CDP HTTP redirects on probe requests, and sanitize blocked-endpoint error responses so tab list/focus/close flows fail closed without echoing raw policy details back to callers. (#63332)
- Gateway/OpenAI compat: return real `usage` for non-stream `/v1/chat/completions` responses, emit the final usage chunk when `stream_options.include_usage=true`, and bound usage-gated stream finalization after lifecycle end. (#62986) Thanks @Lellansin.
## 2026.4.9

View File

@@ -224,4 +224,63 @@ describe("handleAgentEnd", () => {
resolveChannelFlush?.();
await endPromise;
});
it("emits lifecycle end after async channel flush completes", async () => {
let resolveChannelFlush: (() => void) | undefined;
const onAgentEvent = vi.fn();
const onBlockReplyFlush = vi.fn(
() =>
new Promise<void>((resolve) => {
resolveChannelFlush = resolve;
}),
);
const ctx = createContext(undefined, { onAgentEvent, onBlockReplyFlush });
const endPromise = handleAgentEnd(ctx);
expect(onAgentEvent).not.toHaveBeenCalled();
resolveChannelFlush?.();
await endPromise;
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: { phase: "end" },
});
});
it("emits lifecycle error after async channel flush completes", async () => {
let resolveChannelFlush: (() => void) | undefined;
const onAgentEvent = vi.fn();
const onBlockReplyFlush = vi.fn(
() =>
new Promise<void>((resolve) => {
resolveChannelFlush = resolve;
}),
);
const ctx = createContext(
{
role: "assistant",
stopReason: "error",
errorMessage: "connection refused",
content: [{ type: "text", text: "" }],
},
{ onAgentEvent, onBlockReplyFlush },
);
const endPromise = handleAgentEnd(ctx);
expect(onAgentEvent).not.toHaveBeenCalled();
resolveChannelFlush?.();
await endPromise;
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
phase: "error",
error: "LLM request failed: connection refused by the provider endpoint.",
},
});
});
});

View File

@@ -38,6 +38,7 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
const lastAssistant = ctx.state.lastAssistant;
const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error";
let lifecycleErrorText: string | undefined;
if (isError && lastAssistant) {
const friendlyError = formatAssistantErrorText(lastAssistant, {
@@ -54,6 +55,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
const observedError = buildApiErrorObservationFields(rawError);
const safeErrorText =
buildTextObservationFields(errorText).textPreview ?? "LLM request failed.";
lifecycleErrorText = safeErrorText;
const safeRunId = sanitizeForConsole(ctx.params.runId) ?? "-";
const safeModel = sanitizeForConsole(lastAssistant.model) ?? "unknown";
const safeProvider = sanitizeForConsole(lastAssistant.provider) ?? "unknown";
@@ -71,24 +73,30 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
...observedError,
consoleMessage: `embedded run agent end: runId=${safeRunId} isError=true model=${safeModel} provider=${safeProvider} error=${safeErrorText}${rawErrorConsoleSuffix}`,
});
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "error",
error: safeErrorText,
endedAt: Date.now(),
},
});
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: {
phase: "error",
error: safeErrorText,
},
});
} else {
ctx.log.debug(`embedded run agent end: runId=${ctx.params.runId} isError=${isError}`);
}
const emitLifecycleTerminal = () => {
if (isError) {
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
data: {
phase: "error",
error: lifecycleErrorText ?? "LLM request failed.",
endedAt: Date.now(),
},
});
void ctx.params.onAgentEvent?.({
stream: "lifecycle",
data: {
phase: "error",
error: lifecycleErrorText ?? "LLM request failed.",
},
});
return;
}
emitAgentEvent({
runId: ctx.params.runId,
stream: "lifecycle",
@@ -101,7 +109,7 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
stream: "lifecycle",
data: { phase: "end" },
});
}
};
const finalizeAgentEnd = () => {
ctx.state.blockState.thinking = false;
@@ -140,11 +148,14 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer();
finalizeAgentEnd();
if (isPromiseLike<void>(flushBlockReplyBufferResult)) {
return flushBlockReplyBufferResult.then(() => flushPendingMediaAndChannel());
return flushBlockReplyBufferResult
.then(() => flushPendingMediaAndChannel())
.then(() => emitLifecycleTerminal());
}
const flushPendingMediaAndChannelResult = flushPendingMediaAndChannel();
if (isPromiseLike<void>(flushPendingMediaAndChannelResult)) {
return flushPendingMediaAndChannelResult;
return flushPendingMediaAndChannelResult.then(() => emitLifecycleTerminal());
}
emitLifecycleTerminal();
}

View File

@@ -4,6 +4,7 @@ import {
hasNonzeroUsage,
derivePromptTokens,
deriveSessionTotalTokens,
toOpenAiChatCompletionsUsage,
} from "./usage.js";
describe("normalizeUsage", () => {
@@ -146,6 +147,90 @@ describe("normalizeUsage", () => {
});
});
describe("toOpenAiChatCompletionsUsage", () => {
it("uses max(component sum, aggregate total) when breakdown is partial", () => {
const usage = normalizeUsage({ output_tokens: 20, total_tokens: 100 });
expect(toOpenAiChatCompletionsUsage(usage)).toEqual({
prompt_tokens: 0,
completion_tokens: 20,
total_tokens: 100,
});
});
it("uses component sum when it exceeds aggregate total", () => {
expect(
toOpenAiChatCompletionsUsage({
input: 30,
output: 40,
total: 50,
}),
).toEqual({
prompt_tokens: 30,
completion_tokens: 40,
total_tokens: 70,
});
});
it("uses aggregate total when only total is present", () => {
const usage = normalizeUsage({ total_tokens: 42 });
expect(toOpenAiChatCompletionsUsage(usage)).toEqual({
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 42,
});
});
it("returns zeros for undefined usage", () => {
expect(toOpenAiChatCompletionsUsage(undefined)).toEqual({
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
});
});
it("raises total_tokens with aggregate when cache write is excluded from prompt sum", () => {
expect(
toOpenAiChatCompletionsUsage({
input: 10,
output: 5,
cacheWrite: 100,
total: 200,
}),
).toEqual({
prompt_tokens: 10,
completion_tokens: 5,
total_tokens: 200,
});
});
it("clamps negative completion before deriving total_tokens", () => {
expect(
toOpenAiChatCompletionsUsage({
input: 3,
output: -5,
}),
).toEqual({
prompt_tokens: 3,
completion_tokens: 0,
total_tokens: 3,
});
});
it("preserves aggregate total when components are partially negative", () => {
expect(
toOpenAiChatCompletionsUsage({
input: 3,
output: -5,
total: 7,
}),
).toEqual({
prompt_tokens: 3,
completion_tokens: 0,
total_tokens: 7,
});
});
});
describe("hasNonzeroUsage", () => {
it("returns true when cache read is nonzero", () => {
const usage = { cacheRead: 100 };

View File

@@ -143,6 +143,41 @@ export function normalizeUsage(raw?: UsageLike | null): NormalizedUsage | undefi
};
}
/**
* Maps normalized usage to OpenAI Chat Completions `usage` fields.
*
* `prompt_tokens` is input + cacheRead (cache write is excluded to match the
* OpenAI-style breakdown used by the compat endpoint).
*
* `total_tokens` is the greater of the component sum and aggregate `total` when
* present, so a partial breakdown cannot discard a valid upstream total.
*/
export function toOpenAiChatCompletionsUsage(usage: NormalizedUsage | undefined): {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
} {
const input = usage?.input ?? 0;
const output = usage?.output ?? 0;
const cacheRead = usage?.cacheRead ?? 0;
const promptTokens = Math.max(0, input + cacheRead);
const completionTokens = Math.max(0, output);
const componentTotal = promptTokens + completionTokens;
const aggregateRaw = usage?.total;
const aggregateTotal =
typeof aggregateRaw === "number" && Number.isFinite(aggregateRaw)
? Math.max(0, aggregateRaw)
: undefined;
const totalTokens =
aggregateTotal !== undefined ? Math.max(componentTotal, aggregateTotal) : componentTotal;
return {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: totalTokens,
};
}
export function derivePromptTokens(usage?: {
input?: number;
cacheRead?: number;

View File

@@ -700,6 +700,159 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
expect(msg.content).toBe("hello");
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage basic" }],
meta: {
agentMeta: {
usage: {
input: 42,
output: 17,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 42,
completion_tokens: 17,
total_tokens: 59,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage cache" }],
meta: {
agentMeta: {
usage: {
input: 10,
output: 5,
cacheRead: 20,
cacheWrite: 3,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 30,
completion_tokens: 5,
total_tokens: 35,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage total" }],
meta: {
agentMeta: {
usage: {
input: 10,
output: 5,
total: 100,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 10,
completion_tokens: 5,
total_tokens: 100,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage total only" }],
meta: {
agentMeta: {
usage: {
total: 123,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 123,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage non-finite" }],
meta: {
agentMeta: {
usage: {
input: Number.POSITIVE_INFINITY,
output: Number.NaN,
cacheRead: 2,
cacheWrite: Number.POSITIVE_INFINITY,
total: Number.NaN,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 2,
completion_tokens: 0,
total_tokens: 2,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage non-finite aggregate fallback" }],
meta: {
agentMeta: {
usage: {
input: Number.POSITIVE_INFINITY,
output: Number.NaN,
total: 123,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 123,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({
payloads: [{ text: "usage cache-write only" }],
meta: {
agentMeta: {
usage: {
cacheWrite: 10,
total: 10,
},
},
},
} as never);
const json = await postSyncUserMessage("usage");
expect(json.usage).toEqual({
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 10,
});
}
{
agentCommand.mockClear();
agentCommand.mockResolvedValueOnce({ payloads: [{ text: "" }] } as never);
@@ -802,6 +955,8 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
.filter((v): v is string => typeof v === "string")
.join("");
expect(allContent).toBe("hello");
const usageChunks = jsonChunks.filter((c) => "usage" in c);
expect(usageChunks).toHaveLength(0);
}
{
@@ -879,6 +1034,225 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
}
});
it("includes usage in final stream chunk when stream_options.include_usage=true", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockImplementationOnce((async (opts: unknown) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
emitAgentEvent({ runId, stream: "assistant", data: { delta: "he" } });
emitAgentEvent({ runId, stream: "assistant", data: { delta: "llo" } });
return {
payloads: [{ text: "hello" }],
meta: {
agentMeta: {
usage: {
input: 12,
output: 5,
cacheRead: 3,
cacheWrite: 0,
total: 20,
},
},
},
};
}) as never);
const res = await postChatCompletions(port, {
stream: true,
stream_options: { include_usage: true },
model: "openclaw",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const text = await res.text();
const data = parseSseDataLines(text);
expect(data[data.length - 1]).toBe("[DONE]");
const jsonChunks = data
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
const usageChunk = jsonChunks.find((chunk) => "usage" in chunk);
expect(usageChunk?.usage).toEqual({
prompt_tokens: 15,
completion_tokens: 5,
total_tokens: 20,
});
expect(usageChunk?.choices).toEqual([]);
});
it("keeps aggregate-only usage total in final stream usage chunk", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockImplementationOnce((async (opts: unknown) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } });
return {
payloads: [{ text: "hello" }],
meta: {
agentMeta: {
usage: {
total: 123,
},
},
},
};
}) as never);
const res = await postChatCompletions(port, {
stream: true,
stream_options: { include_usage: true },
model: "openclaw",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const text = await res.text();
const data = parseSseDataLines(text);
expect(data[data.length - 1]).toBe("[DONE]");
const jsonChunks = data
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
const usageChunk = jsonChunks.find((chunk) => "usage" in chunk);
expect(usageChunk?.usage).toEqual({
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 123,
});
});
it("finalizes stream when lifecycle end arrives before usage is available", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockImplementationOnce(
((opts: unknown) =>
new Promise((resolve) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } });
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end" } });
setTimeout(() => {
resolve({
payloads: [{ text: "hello" }],
meta: {
agentMeta: {
usage: { input: 7, output: 3, total: 10 },
},
},
});
}, 100);
})) as never,
);
const res = await postChatCompletions(port, {
stream: true,
stream_options: { include_usage: true },
model: "openclaw",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const text = await res.text();
const data = parseSseDataLines(text);
expect(data[data.length - 1]).toBe("[DONE]");
const jsonChunks = data
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
const usageChunk = jsonChunks.find((chunk) => "usage" in chunk);
expect(usageChunk?.usage).toEqual({
prompt_tokens: 7,
completion_tokens: 3,
total_tokens: 10,
});
});
it(
"cleans up usage-enabled stream when client disconnects before usage arrives",
{ timeout: 15_000 },
async () => {
const port = enabledPort;
let serverAbortSignal: AbortSignal | undefined;
agentCommand.mockClear();
agentCommand.mockImplementationOnce(
(opts: unknown) =>
new Promise<undefined>((resolve) => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
const signal = (opts as { abortSignal?: AbortSignal } | undefined)?.abortSignal;
serverAbortSignal = signal;
emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } });
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end" } });
if (signal?.aborted) {
resolve(undefined);
return;
}
signal?.addEventListener("abort", () => resolve(undefined), { once: true });
}),
);
const clientReq = http.request({
hostname: "127.0.0.1",
port,
path: "/v1/chat/completions",
method: "POST",
headers: {
"content-type": "application/json",
authorization: "Bearer secret",
},
});
clientReq.on("error", () => {});
clientReq.end(
JSON.stringify({
stream: true,
stream_options: { include_usage: true },
model: "openclaw",
messages: [{ role: "user", content: "hi" }],
}),
);
await vi.waitFor(() => {
expect(agentCommand).toHaveBeenCalledTimes(1);
});
clientReq.destroy();
await vi.waitFor(
() => {
expect(serverAbortSignal?.aborted).toBe(true);
},
{ timeout: 5_000, interval: 50 },
);
},
);
it("does not block stream finalization on usage when include_usage is not requested", async () => {
const port = enabledPort;
agentCommand.mockClear();
agentCommand.mockImplementationOnce(
((opts: unknown) =>
new Promise(() => {
const runId = (opts as { runId?: string } | undefined)?.runId ?? "";
emitAgentEvent({ runId, stream: "assistant", data: { delta: "hello" } });
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end" } });
})) as never,
);
const res = await postChatCompletions(port, {
stream: true,
model: "openclaw",
messages: [{ role: "user", content: "hi" }],
});
expect(res.status).toBe(200);
const text = await res.text();
const data = parseSseDataLines(text);
expect(data[data.length - 1]).toBe("[DONE]");
const jsonChunks = data
.filter((d) => d !== "[DONE]")
.map((d) => JSON.parse(d) as Record<string, unknown>);
const usageChunks = jsonChunks.filter((chunk) => "usage" in chunk);
expect(usageChunks).toHaveLength(0);
});
it("treats shared-secret bearer callers as owner operators", async () => {
const port = await getFreePort();
const server = await startTokenServer(port);

View File

@@ -1,6 +1,7 @@
import { randomUUID } from "node:crypto";
import type { IncomingMessage, ServerResponse } from "node:http";
import type { ImageContent } from "../agents/command/types.js";
import { normalizeUsage, toOpenAiChatCompletionsUsage } from "../agents/usage.js";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommandFromIngress } from "../commands/agent.js";
import type { GatewayHttpChatCompletionsConfig } from "../config/types.gateway.js";
@@ -57,6 +58,8 @@ type OpenAiChatMessage = {
type OpenAiChatCompletionRequest = {
model?: unknown;
stream?: unknown;
// Naming/style reference: src/agents/openai-transport-stream.ts:1262-1273
stream_options?: unknown;
messages?: unknown;
user?: unknown;
};
@@ -163,6 +166,40 @@ function writeAssistantContentChunk(
});
}
function writeAssistantStopChunk(res: ServerResponse, params: { runId: string; model: string }) {
writeSse(res, {
id: params.runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model: params.model,
choices: [
{
index: 0,
delta: {},
finish_reason: "stop",
},
],
});
}
function writeUsageChunk(
res: ServerResponse,
params: {
runId: string;
model: string;
usage: { prompt_tokens: number; completion_tokens: number; total_tokens: number };
},
) {
writeSse(res, {
id: params.runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model: params.model,
choices: [],
usage: params.usage,
});
}
function asMessages(val: unknown): OpenAiChatMessage[] {
return Array.isArray(val) ? (val as OpenAiChatMessage[]) : [];
}
@@ -421,6 +458,44 @@ function resolveAgentResponseText(result: unknown): string {
return content || "No response from OpenClaw.";
}
type AgentUsageMeta = {
input?: number;
output?: number;
cacheRead?: number;
cacheWrite?: number;
total?: number;
};
function resolveRawAgentUsage(result: unknown): AgentUsageMeta | undefined {
return (
result as {
meta?: {
agentMeta?: {
usage?: AgentUsageMeta;
};
};
} | null
)?.meta?.agentMeta?.usage;
}
function resolveChatCompletionUsage(result: unknown): {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
} {
return toOpenAiChatCompletionsUsage(normalizeUsage(resolveRawAgentUsage(result)));
}
function resolveIncludeUsageForStreaming(payload: OpenAiChatCompletionRequest): boolean {
// Keep parsing aligned with OpenAI wire-format field names.
// Flow reference: src/agents/openai-transport-stream.ts:1262-1273
const streamOptions = payload.stream_options;
if (!streamOptions || typeof streamOptions !== "object" || Array.isArray(streamOptions)) {
return false;
}
return (streamOptions as { include_usage?: unknown }).include_usage === true;
}
export async function handleOpenAiHttpRequest(
req: IncomingMessage,
res: ServerResponse,
@@ -451,6 +526,7 @@ export async function handleOpenAiHttpRequest(
const payload = coerceRequest(handled.body);
const stream = Boolean(payload.stream);
const streamIncludeUsage = stream && resolveIncludeUsageForStreaming(payload);
const model = typeof payload.model === "string" ? payload.model : "openclaw";
const user = typeof payload.user === "string" ? payload.user : undefined;
@@ -526,6 +602,7 @@ export async function handleOpenAiHttpRequest(
}
const content = resolveAgentResponseText(result);
const usage = resolveChatCompletionUsage(result);
sendJson(res, 200, {
id: runId,
@@ -539,7 +616,7 @@ export async function handleOpenAiHttpRequest(
finish_reason: "stop",
},
],
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
usage,
});
} catch (err) {
if (abortController.signal.aborted) {
@@ -558,10 +635,45 @@ export async function handleOpenAiHttpRequest(
setSseHeaders(res);
let wroteRole = false;
let wroteStopChunk = false;
let sawAssistantDelta = false;
let finalUsage:
| {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
}
| undefined;
let finalizeRequested = false;
let closed = false;
let stopWatchingDisconnect = () => {};
const maybeFinalize = () => {
if (closed || !finalizeRequested) {
return;
}
if (streamIncludeUsage && !finalUsage) {
return;
}
closed = true;
stopWatchingDisconnect();
unsubscribe();
if (!wroteStopChunk) {
writeAssistantStopChunk(res, { runId, model });
wroteStopChunk = true;
}
if (streamIncludeUsage && finalUsage) {
writeUsageChunk(res, { runId, model, usage: finalUsage });
}
writeDone(res);
res.end();
};
const requestFinalize = () => {
finalizeRequested = true;
maybeFinalize();
};
const unsubscribe = onAgentEvent((evt) => {
if (evt.runId !== runId) {
return;
@@ -594,11 +706,7 @@ export async function handleOpenAiHttpRequest(
if (evt.stream === "lifecycle") {
const phase = evt.data?.phase;
if (phase === "end" || phase === "error") {
closed = true;
stopWatchingDisconnect();
unsubscribe();
writeDone(res);
res.end();
requestFinalize();
}
}
});
@@ -616,6 +724,8 @@ export async function handleOpenAiHttpRequest(
return;
}
finalUsage = resolveChatCompletionUsage(result);
if (!sawAssistantDelta) {
if (!wroteRole) {
wroteRole = true;
@@ -632,6 +742,7 @@ export async function handleOpenAiHttpRequest(
finishReason: null,
});
}
requestFinalize();
} catch (err) {
if (closed || abortController.signal.aborted) {
return;
@@ -643,18 +754,25 @@ export async function handleOpenAiHttpRequest(
content: "Error: internal error",
finishReason: "stop",
});
wroteStopChunk = true;
finalUsage = {
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
};
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "error" },
});
requestFinalize();
} finally {
if (!closed) {
closed = true;
stopWatchingDisconnect();
unsubscribe();
writeDone(res);
res.end();
emitAgentEvent({
runId,
stream: "lifecycle",
data: { phase: "end" },
});
}
}
})();