From 5e8db468ff03492ba2a4958e3d7ddfbc232a4edc Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Fri, 27 Mar 2026 21:13:38 +0530 Subject: [PATCH] fix(agents): preserve embedded auth on HTTP fallback --- src/agents/openai-ws-stream.ts | 17 +++++++++++------ src/agents/pi-embedded-runner/run/attempt.ts | 7 +++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 53d8271c9d9..6cd39cf093c 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -299,7 +299,7 @@ export function createOpenAIWebSocketStreamFn( const run = async () => { const transport = resolveWsTransport(options); if (transport === "sse") { - return fallbackToHttp(model, context, options, eventStream, opts.signal); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); } // ── 1. Get or create session state ────────────────────────────────── @@ -339,7 +339,7 @@ export function createOpenAIWebSocketStreamFn( `[ws-stream] WebSocket connect failed for session=${sessionId}; falling back to HTTP. error=${String(connErr)}`, ); // Fall back to HTTP immediately - return fallbackToHttp(model, context, options, eventStream, opts.signal); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); } } @@ -356,7 +356,7 @@ export function createOpenAIWebSocketStreamFn( /* ignore */ } wsRegistry.delete(sessionId); - return fallbackToHttp(model, context, options, eventStream, opts.signal); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); } const signal = opts.signal ?? (options as WsOptions | undefined)?.signal; @@ -401,7 +401,7 @@ export function createOpenAIWebSocketStreamFn( log.warn( `[ws-stream] reconnect after warm-up failed for session=${sessionId}; falling back to HTTP. error=${String(reconnectErr)}`, ); - return fallbackToHttp(model, context, options, eventStream, opts.signal); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); } } } @@ -506,7 +506,7 @@ export function createOpenAIWebSocketStreamFn( /* ignore */ } wsRegistry.delete(sessionId); - return fallbackToHttp(model, context, options, eventStream, opts.signal); + return fallbackToHttp(model, context, options, apiKey, eventStream, opts.signal); } eventStream.push({ @@ -616,10 +616,15 @@ async function fallbackToHttp( model: Parameters[0], context: Parameters[1], options: Parameters[2], + apiKey: string, eventStream: AssistantMessageEventStreamLike, signal?: AbortSignal, ): Promise { - const mergedOptions = signal ? { ...options, signal } : options; + const mergedOptions = { + ...options, + apiKey, + ...(signal ? { signal } : {}), + }; const httpStream = openAIWsStreamDeps.streamSimple(model, context, mergedOptions); for await (const event of httpStream) { eventStream.push(event); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 036c587a93e..e396a049118 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1,7 +1,6 @@ import fs from "node:fs/promises"; import os from "node:os"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import { streamSimple } from "@mariozechner/pi-ai"; import { createAgentSession, DefaultResourceLoader, @@ -844,6 +843,7 @@ export async function runEmbeddedAttempt( workspaceDir: params.workspaceDir, }); + const defaultSessionStreamFn = activeSession.agent.streamFn; const providerStreamFn = registerProviderStreamForModel({ model: params.model, cfg: params.config, @@ -865,15 +865,14 @@ export async function runEmbeddedAttempt( }); } else { log.warn(`[ws-stream] no API key for provider=${params.provider}; using HTTP transport`); - activeSession.agent.streamFn = streamSimple; + activeSession.agent.streamFn = defaultSessionStreamFn; } } else if (params.model.provider === "anthropic-vertex") { // Anthropic Vertex AI: inject AnthropicVertex client into pi-ai's // streamAnthropic for GCP IAM auth instead of Anthropic API keys. activeSession.agent.streamFn = createAnthropicVertexStreamFnForModel(params.model); } else { - // Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai. - activeSession.agent.streamFn = streamSimple; + activeSession.agent.streamFn = defaultSessionStreamFn; } const { effectiveExtraParams } = applyExtraParamsToAgent(