From b68f3de91b305c61dce6db7a869cece196917843 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 1 May 2026 11:51:45 +0100 Subject: [PATCH] fix(agent): honor explicit OpenAI SSE transport --- src/agents/openai-ws-stream.test.ts | 28 ++++++++++++++++++++ src/agents/openai-ws-stream.ts | 1 + src/agents/pi-embedded-runner/run/attempt.ts | 18 +++++++++---- 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index b680cc20372..20309599434 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -1917,6 +1917,7 @@ describe("createOpenAIWebSocketStreamFn", () => { releaseWsSession("sess-2"); releaseWsSession("sess-boundary"); releaseWsSession("sess-fallback"); + releaseWsSession("sess-explicit-sse"); releaseWsSession("sess-boundary-http-fallback"); releaseWsSession("sess-full-context-replay"); releaseWsSession("sess-encrypted-full-context-replay"); @@ -2681,6 +2682,33 @@ describe("createOpenAIWebSocketStreamFn", () => { } }); + it("ends the HTTP fallback stream when explicit SSE transport is selected", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-explicit-sse"); + const stream = await resolveStream( + streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + { transport: "sse" } as Parameters[2], + ), + ); + + await expect( + Promise.race([ + ( + stream as unknown as { + result: () => Promise<{ content?: Array<{ text?: string }> }>; + } + ).result(), + new Promise((_, reject) => + setTimeout(() => reject(new Error("SSE fallback result timed out")), 100), + ), + ]), + ).resolves.toMatchObject({ + content: [{ text: "http fallback response" }], + }); + expect(streamSimpleCalls).toHaveLength(1); + }); + it("falls back to HTTP when WebSocket errors before any output in auto mode", async () => { const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-runtime-fallback"); const stream = streamFn( diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 365f36a0f05..12849375fe4 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -1346,6 +1346,7 @@ async function fallbackToHttp( } eventStream.push(event); } + eventStream.end(); } export const __testing = { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index b25937eda23..a473e3f45e3 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1691,11 +1691,19 @@ export async function runEmbeddedAttempt( agentDir, workspaceDir: effectiveWorkspace, }); - const shouldUseWebSocketTransport = shouldUseOpenAIWebSocketTransport({ - provider: params.provider, - modelApi: params.model.api, - modelBaseUrl: params.model.baseUrl, - }); + const hasExplicitSseTransport = [ + (params.streamParams as { transport?: unknown } | undefined)?.transport, + (params.model as { params?: { transport?: unknown } }).params?.transport, + ] + .map((value) => (typeof value === "string" ? value.trim().toLowerCase() : "")) + .includes("sse"); + const shouldUseWebSocketTransport = + !hasExplicitSseTransport && + shouldUseOpenAIWebSocketTransport({ + provider: params.provider, + modelApi: params.model.api, + modelBaseUrl: params.model.baseUrl, + }); const wsApiKey = shouldUseWebSocketTransport ? await resolveEmbeddedAgentApiKey({ provider: params.provider,