From 16fd9a9d5973420f326fb83fa66b4ba386a40859 Mon Sep 17 00:00:00 2001 From: Chunyue Wang <80630709+openperf@users.noreply.github.com> Date: Wed, 29 Apr 2026 12:56:43 +0800 Subject: [PATCH] fix(agents): inject resolved OAuth bearer into boundary-aware embedded streams (#73588) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes openclaw#73559. Extracts a shared wrapEmbeddedAgentStreamFn helper and applies it to both provider-owned and boundary-aware fallback paths in resolveEmbeddedAgentStreamFn, forwarding the resolved OAuth bearer (resolvedApiKey → authStorage → options.apiKey) and run abort signal so models routing through openai-codex-responses and other boundary-aware transports stop failing with 401 Missing bearer auth header. --- CHANGELOG.md | 1 + .../stream-resolution.test.ts | 155 ++++++++++++++++++ .../pi-embedded-runner/stream-resolution.ts | 88 ++++++---- 3 files changed, 213 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 902c16bbc62..000aa66e78c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -104,6 +104,7 @@ Docs: https://docs.openclaw.ai - WhatsApp/reliability: publish real transport-liveness into WhatsApp channel status and force earlier reconnects on silent transport stalls, so quiet healthy sessions stay connected while wedged sockets recover before the later remote 408 path. (#72656) Thanks @Sathvik-1007. - Core/channels: tighten selected runtime, media, and plugin edge-case handling while preserving existing behavior. Thanks @jesse-merhi. - Channels/WhatsApp: strip leaked plural tool-call XML wrappers on every WhatsApp-visible outbound path and allow `channels.whatsapp.exposeErrorText` to suppress visible error text per channel or account. (#71830) Thanks @rubencu. +- Agents/embedded-runner: inject the resolved OAuth bearer (and forward the run abort signal) on the boundary-aware embedded stream fallback so models that route through `openai-codex-responses` and other boundary-aware transports stop failing with `401 Unauthorized: Missing bearer or basic authentication in header`. Fixes #73559. (#73588) Thanks @openperf. ## 2026.4.27 diff --git a/src/agents/pi-embedded-runner/stream-resolution.test.ts b/src/agents/pi-embedded-runner/stream-resolution.test.ts index ceea120e18b..66ba5247ec7 100644 --- a/src/agents/pi-embedded-runner/stream-resolution.test.ts +++ b/src/agents/pi-embedded-runner/stream-resolution.test.ts @@ -1,11 +1,32 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; import { streamSimple } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; +import * as providerTransportStream from "../provider-transport-stream.js"; import { describeEmbeddedAgentStreamStrategy, resolveEmbeddedAgentApiKey, resolveEmbeddedAgentStreamFn, } from "./stream-resolution.js"; +// Wrap createBoundaryAwareStreamFnForModel with a spy that delegates to the +// real implementation by default so existing routing tests still observe a +// real transport stream; per-test overrideBoundaryAwareStreamFnOnce() injects +// a probe stream when a regression test needs to inspect the wrapped +// transport's options. +vi.mock("../provider-transport-stream.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + createBoundaryAwareStreamFnForModel: vi.fn(actual.createBoundaryAwareStreamFnForModel), + }; +}); + +const overrideBoundaryAwareStreamFnOnce = (streamFn: StreamFn): void => { + vi.mocked(providerTransportStream.createBoundaryAwareStreamFnForModel).mockReturnValueOnce( + streamFn, + ); +}; + describe("describeEmbeddedAgentStreamStrategy", () => { it("describes provider-owned stream paths explicitly", () => { expect( @@ -203,4 +224,138 @@ describe("resolveEmbeddedAgentStreamFn", () => { signal: explicitSignal, }); }); + + it("injects the resolved run api key into the boundary-aware Codex Responses fallback", async () => { + const innerStreamFn = vi.fn(async (_model, _context, options) => options); + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-codex-responses", + provider: "openai-codex", + id: "gpt-5.5", + } as never, + resolvedApiKey: "oauth-bearer-token", + }); + + await expect( + streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}), + ).resolves.toMatchObject({ apiKey: "oauth-bearer-token" }); + expect(innerStreamFn).toHaveBeenCalledTimes(1); + }); + + it("falls back to authStorage when no resolved api key is available for boundary-aware fallback", async () => { + const innerStreamFn = vi.fn(async (_model, _context, options) => options); + const authStorage = { + getApiKey: vi.fn(async () => "stored-bearer-token"), + }; + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-codex-responses", + provider: "openai-codex", + id: "gpt-5.5", + } as never, + authStorage, + }); + + await expect( + streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}), + ).resolves.toMatchObject({ apiKey: "stored-bearer-token" }); + expect(authStorage.getApiKey).toHaveBeenCalledWith("openai-codex"); + }); + + it("forwards the run abort signal into the boundary-aware fallback when callers omit one", async () => { + const innerStreamFn = vi.fn(async (_model, _context, options) => options); + const runSignal = new AbortController().signal; + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + signal: runSignal, + model: { + api: "openai-codex-responses", + provider: "openai-codex", + id: "gpt-5.5", + } as never, + resolvedApiKey: "oauth-bearer-token", + }); + + await expect( + streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}), + ).resolves.toMatchObject({ signal: runSignal, apiKey: "oauth-bearer-token" }); + }); + + it("does not overwrite an explicit signal on the boundary-aware fallback path", async () => { + const innerStreamFn = vi.fn(async (_model, _context, options) => options); + const runSignal = new AbortController().signal; + const explicitSignal = new AbortController().signal; + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + signal: runSignal, + model: { + api: "openai-codex-responses", + provider: "openai-codex", + id: "gpt-5.5", + } as never, + resolvedApiKey: "oauth-bearer-token", + }); + + await expect( + streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, { + signal: explicitSignal, + }), + ).resolves.toMatchObject({ signal: explicitSignal }); + }); + + it("forwards the run signal on the sync boundary-aware fallback path without auth credentials", async () => { + const innerStreamFn = vi.fn(async (_model, _context, options) => options); + const runSignal = new AbortController().signal; + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + signal: runSignal, + model: { + api: "openai-codex-responses", + provider: "openai-codex", + id: "gpt-5.5", + } as never, + }); + + await expect( + streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, {} as never, {}), + ).resolves.toMatchObject({ signal: runSignal }); + }); + + it("does not strip cache boundary markers on the boundary-aware fallback path", async () => { + const innerStreamFn = vi.fn(async (_model, context, _options) => context); + overrideBoundaryAwareStreamFnOnce(innerStreamFn as never); + const streamFn = resolveEmbeddedAgentStreamFn({ + currentStreamFn: undefined, + shouldUseWebSocketTransport: false, + sessionId: "session-1", + model: { + api: "openai-codex-responses", + provider: "openai-codex", + id: "gpt-5.5", + } as never, + resolvedApiKey: "oauth-bearer-token", + }); + + const systemPrompt = "intro<>tail"; + await expect( + streamFn({ provider: "openai-codex", id: "gpt-5.5" } as never, { systemPrompt } as never, {}), + ).resolves.toMatchObject({ systemPrompt }); + }); }); diff --git a/src/agents/pi-embedded-runner/stream-resolution.ts b/src/agents/pi-embedded-runner/stream-resolution.ts index 1f2b504690b..c1e2f5aa225 100644 --- a/src/agents/pi-embedded-runner/stream-resolution.ts +++ b/src/agents/pi-embedded-runner/stream-resolution.ts @@ -73,36 +73,19 @@ export function resolveEmbeddedAgentStreamFn(params: { authStorage?: { getApiKey(provider: string): Promise }; }): StreamFn { if (params.providerStreamFn) { - const inner = params.providerStreamFn; - const normalizeContext = (context: Parameters[1]) => - context.systemPrompt - ? { - ...context, - systemPrompt: stripSystemPromptCacheBoundary(context.systemPrompt), - } - : context; - const mergeRunSignal = (options: Parameters[2]) => { - const signal = options?.signal ?? params.signal; - return signal ? { ...options, signal } : options; - }; - // Provider-owned transports bypass pi-coding-agent's default auth lookup, - // so keep injecting the resolved runtime apiKey for streamSimple-compatible - // transports that still read credentials from options.apiKey. - if (params.authStorage || params.resolvedApiKey) { - const { authStorage, model, resolvedApiKey } = params; - return async (m, context, options) => { - const apiKey = await resolveEmbeddedAgentApiKey({ - provider: model.provider, - resolvedApiKey, - authStorage, - }); - return inner(m, normalizeContext(context), { - ...mergeRunSignal(options), - apiKey: apiKey ?? options?.apiKey, - }); - }; - } - return (m, context, options) => inner(m, normalizeContext(context), mergeRunSignal(options)); + return wrapEmbeddedAgentStreamFn(params.providerStreamFn, { + runSignal: params.signal, + resolvedApiKey: params.resolvedApiKey, + authStorage: params.authStorage, + providerId: params.model.provider, + transformContext: (context) => + context.systemPrompt + ? { + ...context, + systemPrompt: stripSystemPromptCacheBoundary(context.systemPrompt), + } + : context, + }); } const currentStreamFn = params.currentStreamFn ?? streamSimple; @@ -124,9 +107,52 @@ export function resolveEmbeddedAgentStreamFn(params: { if (params.currentStreamFn === undefined || params.currentStreamFn === streamSimple) { const boundaryAwareStreamFn = createBoundaryAwareStreamFnForModel(params.model); if (boundaryAwareStreamFn) { - return boundaryAwareStreamFn; + // Boundary-aware transports read credentials from options.apiKey just + // like provider-owned streams, but the embedded run layer never gets to + // inject the resolved runtime key for them. Without this wrap, OAuth + // providers (e.g. openai-codex/gpt-5.5) hit the Responses API with an + // empty bearer and fail with 401 Missing bearer auth header. + return wrapEmbeddedAgentStreamFn(boundaryAwareStreamFn, { + runSignal: params.signal, + resolvedApiKey: params.resolvedApiKey, + authStorage: params.authStorage, + providerId: params.model.provider, + }); } } return currentStreamFn; } + +function wrapEmbeddedAgentStreamFn( + inner: StreamFn, + params: { + runSignal: AbortSignal | undefined; + resolvedApiKey: string | undefined; + authStorage: { getApiKey(provider: string): Promise } | undefined; + providerId: string; + transformContext?: (context: Parameters[1]) => Parameters[1]; + }, +): StreamFn { + const transformContext = + params.transformContext ?? ((context: Parameters[1]) => context); + const mergeRunSignal = (options: Parameters[2]) => { + const signal = options?.signal ?? params.runSignal; + return signal ? { ...options, signal } : options; + }; + if (!params.authStorage && !params.resolvedApiKey) { + return (m, context, options) => inner(m, transformContext(context), mergeRunSignal(options)); + } + const { authStorage, providerId, resolvedApiKey } = params; + return async (m, context, options) => { + const apiKey = await resolveEmbeddedAgentApiKey({ + provider: providerId, + resolvedApiKey, + authStorage, + }); + return inner(m, transformContext(context), { + ...mergeRunSignal(options), + apiKey: apiKey ?? options?.apiKey, + }); + }; +}