fix(openai): skip malformed empty SSE frames

This commit is contained in:
Peter Steinberger
2026-04-29 03:28:33 +01:00
parent 09e2cf1103
commit c2e3b6e6f8
3 changed files with 165 additions and 0 deletions

View File

@@ -70,6 +70,7 @@ Docs: https://docs.openclaw.ai
- CLI/model probes: request trusted operator scope for `infer model run --gateway --model <provider/model>` so Gateway raw model smokes can use one-off provider/model overrides instead of being rejected before provider auth resolution. Fixes #73759. Thanks @chrislro.
- CLI/image describe: pass `--prompt` and `--timeout-ms` through `infer image describe` and `describe-many`, so custom vision instructions and slow local model budgets reach media-understanding providers such as Ollama, OpenAI, Google, and OpenRouter. Refs #63700. Thanks @cedricjanssens.
- Model selection: include the rejected provider/model ref and allowlist recovery hint when a stored session override is cleared, so local model selections such as Gemma GGUF variants do not fall back to the default with a generic message. Refs #71069. Thanks @CyberRaccoonTeam.
- OpenAI-compatible providers: drop malformed event-only or blank-data SSE frames before the OpenAI SDK stream parser sees them, so proxies that split `event:` from `data:` no longer crash streaming runs with `Unexpected end of JSON input`. Fixes #52802. Thanks @LyHug.
- Local model prompt caching: keep stable Project Context above volatile channel/session prompt guidance and stop embedding current channel names in the message tool description, so Ollama, MLX, llama.cpp, and other prefix-cache backends avoid avoidable full prompt reprocessing across channel turns. Fixes #40256; supersedes #40296. Thanks @rhclaw and @sriram369.
- Gateway/OpenAI-compatible API: guard provider policy lookup against runtime providers with non-array `models` values, so `/v1/chat/completions` no longer fails with `provider?.models?.some is not a function`. Fixes #66744; carries forward #66761. Thanks @MightyMoud, @MukundaKatta.
- WhatsApp/Web: pass explicit Baileys socket timings into every WhatsApp Web socket and expose `web.whatsapp.*` keepalive, connect, and query timeout settings so unstable networks can avoid repeated 408 disconnect and opening-handshake timeout loops. Fixes #56365. (#73580) Thanks @velvet-shark.

View File

@@ -1,4 +1,5 @@
import type { Model } from "@mariozechner/pi-ai";
import { Stream } from "openai/streaming";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const {
@@ -138,6 +139,71 @@ describe("buildGuardedModelFetch", () => {
});
});
it("drops event-only SSE frames before the OpenAI SDK stream parser sees them", async () => {
const encoder = new TextEncoder();
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response(
new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode("event: message\n\n"));
controller.enqueue(encoder.encode('data: {"ok": true}\n\n'));
controller.close();
},
}),
{ headers: { "content-type": "text/event-stream" } },
),
finalUrl: "https://api.openai.com/v1/responses",
release: vi.fn(async () => undefined),
});
const { buildGuardedModelFetch } = await import("./provider-transport-fetch.js");
const model = {
id: "gpt-5.4",
provider: "openai",
api: "openai-responses",
baseUrl: "https://api.openai.com/v1",
} as unknown as Model<"openai-responses">;
const response = await buildGuardedModelFetch(model)("https://api.openai.com/v1/responses", {
method: "POST",
});
const items = [];
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
items.push(item);
}
expect(items).toEqual([{ ok: true }]);
});
it("drops whitespace-only SSE data frames with CRLF delimiters", async () => {
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response('event: message\r\ndata: \r\n\r\ndata: {"ok": true}\r\n\r\n', {
headers: { "content-type": "text/event-stream" },
}),
finalUrl: "https://api.openai.com/v1/chat/completions",
release: vi.fn(async () => undefined),
});
const { buildGuardedModelFetch } = await import("./provider-transport-fetch.js");
const model = {
id: "gpt-5.4",
provider: "openai",
api: "openai-completions",
baseUrl: "https://api.openai.com/v1",
} as unknown as Model<"openai-completions">;
const response = await buildGuardedModelFetch(model)(
"https://api.openai.com/v1/chat/completions",
{ method: "POST" },
);
const items = [];
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
items.push(item);
}
expect(items).toEqual([{ ok: true }]);
});
describe("long retry-after handling", () => {
const anthropicModel = {
id: "sonnet-4.6",

View File

@@ -10,6 +10,103 @@ import {
const DEFAULT_MAX_SDK_RETRY_WAIT_SECONDS = 60;
function hasReadableSseData(block: string): boolean {
const dataLines = block
.split(/\r\n|\n|\r/)
.filter((line) => line === "data" || line.startsWith("data:"))
.map((line) => {
if (line === "data") {
return "";
}
const value = line.slice("data:".length);
return value.startsWith(" ") ? value.slice(1) : value;
});
return dataLines.length > 0 && dataLines.join("\n").trim().length > 0;
}
function findSseEventBoundary(buffer: string): { index: number; length: number } | undefined {
let best: { index: number; length: number } | undefined;
for (const delimiter of ["\r\n\r\n", "\n\n", "\r\r"]) {
const index = buffer.indexOf(delimiter);
if (index === -1) {
continue;
}
if (!best || index < best.index) {
best = { index, length: delimiter.length };
}
}
return best;
}
function sanitizeOpenAISdkSseResponse(response: Response): Response {
const contentType = response.headers.get("content-type") ?? "";
if (!response.body || !/\btext\/event-stream\b/i.test(contentType)) {
return response;
}
const source = response.body;
const decoder = new TextDecoder();
const encoder = new TextEncoder();
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
let buffer = "";
const enqueueSanitized = (
controller: ReadableStreamDefaultController<Uint8Array>,
text: string,
) => {
buffer += text;
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
return;
}
const block = buffer.slice(0, boundary.index);
const separator = buffer.slice(boundary.index, boundary.index + boundary.length);
buffer = buffer.slice(boundary.index + boundary.length);
// OpenAI's SDK currently tries to JSON.parse event-only or blank-data SSE
// messages. Drop those malformed keepalive-style blocks before it parses.
if (hasReadableSseData(block)) {
controller.enqueue(encoder.encode(`${block}${separator}`));
}
}
};
const sanitizedBody = new ReadableStream<Uint8Array>({
start() {
reader = source.getReader();
},
async pull(controller) {
try {
const chunk = await reader?.read();
if (!chunk || chunk.done) {
const tail = decoder.decode();
if (tail) {
enqueueSanitized(controller, tail);
}
if (buffer && hasReadableSseData(buffer)) {
controller.enqueue(encoder.encode(buffer));
}
buffer = "";
controller.close();
return;
}
enqueueSanitized(controller, decoder.decode(chunk.value, { stream: true }));
} catch (error) {
controller.error(error);
}
},
async cancel(reason) {
await reader?.cancel(reason);
},
});
return new Response(sanitizedBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});
}
function parseRetryAfterSeconds(headers: Headers): number | undefined {
const retryAfterMs = headers.get("retry-after-ms");
if (retryAfterMs) {
@@ -218,6 +315,7 @@ export function buildGuardedModelFetch(model: Model<Api>, timeoutMs?: number): t
headers,
});
}
response = sanitizeOpenAISdkSseResponse(response);
return buildManagedResponse(response, result.release);
};
}