From c2e3b6e6f8190d13aa45a88edfdae1d3cdfdcdad Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 03:28:33 +0100 Subject: [PATCH] fix(openai): skip malformed empty SSE frames --- CHANGELOG.md | 1 + src/agents/provider-transport-fetch.test.ts | 66 ++++++++++++++ src/agents/provider-transport-fetch.ts | 98 +++++++++++++++++++++ 3 files changed, 165 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 077f90dd81e..07b3f515bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ Docs: https://docs.openclaw.ai - CLI/model probes: request trusted operator scope for `infer model run --gateway --model ` so Gateway raw model smokes can use one-off provider/model overrides instead of being rejected before provider auth resolution. Fixes #73759. Thanks @chrislro. - CLI/image describe: pass `--prompt` and `--timeout-ms` through `infer image describe` and `describe-many`, so custom vision instructions and slow local model budgets reach media-understanding providers such as Ollama, OpenAI, Google, and OpenRouter. Refs #63700. Thanks @cedricjanssens. - Model selection: include the rejected provider/model ref and allowlist recovery hint when a stored session override is cleared, so local model selections such as Gemma GGUF variants do not fall back to the default with a generic message. Refs #71069. Thanks @CyberRaccoonTeam. +- OpenAI-compatible providers: drop malformed event-only or blank-data SSE frames before the OpenAI SDK stream parser sees them, so proxies that split `event:` from `data:` no longer crash streaming runs with `Unexpected end of JSON input`. Fixes #52802. Thanks @LyHug. - Local model prompt caching: keep stable Project Context above volatile channel/session prompt guidance and stop embedding current channel names in the message tool description, so Ollama, MLX, llama.cpp, and other prefix-cache backends avoid avoidable full prompt reprocessing across channel turns. Fixes #40256; supersedes #40296. Thanks @rhclaw and @sriram369. - Gateway/OpenAI-compatible API: guard provider policy lookup against runtime providers with non-array `models` values, so `/v1/chat/completions` no longer fails with `provider?.models?.some is not a function`. Fixes #66744; carries forward #66761. Thanks @MightyMoud, @MukundaKatta. - WhatsApp/Web: pass explicit Baileys socket timings into every WhatsApp Web socket and expose `web.whatsapp.*` keepalive, connect, and query timeout settings so unstable networks can avoid repeated 408 disconnect and opening-handshake timeout loops. Fixes #56365. (#73580) Thanks @velvet-shark. diff --git a/src/agents/provider-transport-fetch.test.ts b/src/agents/provider-transport-fetch.test.ts index c60e6529887..da1969bf698 100644 --- a/src/agents/provider-transport-fetch.test.ts +++ b/src/agents/provider-transport-fetch.test.ts @@ -1,4 +1,5 @@ import type { Model } from "@mariozechner/pi-ai"; +import { Stream } from "openai/streaming"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; const { @@ -138,6 +139,71 @@ describe("buildGuardedModelFetch", () => { }); }); + it("drops event-only SSE frames before the OpenAI SDK stream parser sees them", async () => { + const encoder = new TextEncoder(); + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response( + new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode("event: message\n\n")); + controller.enqueue(encoder.encode('data: {"ok": true}\n\n')); + controller.close(); + }, + }), + { headers: { "content-type": "text/event-stream" } }, + ), + finalUrl: "https://api.openai.com/v1/responses", + release: vi.fn(async () => undefined), + }); + + const { buildGuardedModelFetch } = await import("./provider-transport-fetch.js"); + const model = { + id: "gpt-5.4", + provider: "openai", + api: "openai-responses", + baseUrl: "https://api.openai.com/v1", + } as unknown as Model<"openai-responses">; + + const response = await buildGuardedModelFetch(model)("https://api.openai.com/v1/responses", { + method: "POST", + }); + const items = []; + for await (const item of Stream.fromSSEResponse(response, new AbortController())) { + items.push(item); + } + + expect(items).toEqual([{ ok: true }]); + }); + + it("drops whitespace-only SSE data frames with CRLF delimiters", async () => { + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response('event: message\r\ndata: \r\n\r\ndata: {"ok": true}\r\n\r\n', { + headers: { "content-type": "text/event-stream" }, + }), + finalUrl: "https://api.openai.com/v1/chat/completions", + release: vi.fn(async () => undefined), + }); + + const { buildGuardedModelFetch } = await import("./provider-transport-fetch.js"); + const model = { + id: "gpt-5.4", + provider: "openai", + api: "openai-completions", + baseUrl: "https://api.openai.com/v1", + } as unknown as Model<"openai-completions">; + + const response = await buildGuardedModelFetch(model)( + "https://api.openai.com/v1/chat/completions", + { method: "POST" }, + ); + const items = []; + for await (const item of Stream.fromSSEResponse(response, new AbortController())) { + items.push(item); + } + + expect(items).toEqual([{ ok: true }]); + }); + describe("long retry-after handling", () => { const anthropicModel = { id: "sonnet-4.6", diff --git a/src/agents/provider-transport-fetch.ts b/src/agents/provider-transport-fetch.ts index 745f89e2c58..4dfc1b44087 100644 --- a/src/agents/provider-transport-fetch.ts +++ b/src/agents/provider-transport-fetch.ts @@ -10,6 +10,103 @@ import { const DEFAULT_MAX_SDK_RETRY_WAIT_SECONDS = 60; +function hasReadableSseData(block: string): boolean { + const dataLines = block + .split(/\r\n|\n|\r/) + .filter((line) => line === "data" || line.startsWith("data:")) + .map((line) => { + if (line === "data") { + return ""; + } + const value = line.slice("data:".length); + return value.startsWith(" ") ? value.slice(1) : value; + }); + return dataLines.length > 0 && dataLines.join("\n").trim().length > 0; +} + +function findSseEventBoundary(buffer: string): { index: number; length: number } | undefined { + let best: { index: number; length: number } | undefined; + for (const delimiter of ["\r\n\r\n", "\n\n", "\r\r"]) { + const index = buffer.indexOf(delimiter); + if (index === -1) { + continue; + } + if (!best || index < best.index) { + best = { index, length: delimiter.length }; + } + } + return best; +} + +function sanitizeOpenAISdkSseResponse(response: Response): Response { + const contentType = response.headers.get("content-type") ?? ""; + if (!response.body || !/\btext\/event-stream\b/i.test(contentType)) { + return response; + } + + const source = response.body; + const decoder = new TextDecoder(); + const encoder = new TextEncoder(); + let reader: ReadableStreamDefaultReader | undefined; + let buffer = ""; + + const enqueueSanitized = ( + controller: ReadableStreamDefaultController, + text: string, + ) => { + buffer += text; + for (;;) { + const boundary = findSseEventBoundary(buffer); + if (!boundary) { + return; + } + const block = buffer.slice(0, boundary.index); + const separator = buffer.slice(boundary.index, boundary.index + boundary.length); + buffer = buffer.slice(boundary.index + boundary.length); + // OpenAI's SDK currently tries to JSON.parse event-only or blank-data SSE + // messages. Drop those malformed keepalive-style blocks before it parses. + if (hasReadableSseData(block)) { + controller.enqueue(encoder.encode(`${block}${separator}`)); + } + } + }; + + const sanitizedBody = new ReadableStream({ + start() { + reader = source.getReader(); + }, + async pull(controller) { + try { + const chunk = await reader?.read(); + if (!chunk || chunk.done) { + const tail = decoder.decode(); + if (tail) { + enqueueSanitized(controller, tail); + } + if (buffer && hasReadableSseData(buffer)) { + controller.enqueue(encoder.encode(buffer)); + } + buffer = ""; + controller.close(); + return; + } + enqueueSanitized(controller, decoder.decode(chunk.value, { stream: true })); + } catch (error) { + controller.error(error); + } + }, + async cancel(reason) { + await reader?.cancel(reason); + }, + }); + + return new Response(sanitizedBody, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); +} + function parseRetryAfterSeconds(headers: Headers): number | undefined { const retryAfterMs = headers.get("retry-after-ms"); if (retryAfterMs) { @@ -218,6 +315,7 @@ export function buildGuardedModelFetch(model: Model, timeoutMs?: number): t headers, }); } + response = sanitizeOpenAISdkSseResponse(response); return buildManagedResponse(response, result.release); }; }