Skip to content

Instantly share code, notes, and snippets.

@florimondmanca
Last active July 16, 2024 01:48
Show Gist options
  • Save florimondmanca/d56764d78d748eb9f73165da388e546e to your computer and use it in GitHub Desktop.
Save florimondmanca/d56764d78d748eb9f73165da388e546e to your computer and use it in GitHub Desktop.
URLLib3 transport implementation (Extracted from HTTPX)

Changelog:

  • 2024-07-15: updated to support recent HTTPX versions, based on @karpetrosyan's comment.
  • 2021-01 (ish): initial version

urllib3-transport

An HTTPCore transport that uses urllib3 as the HTTP networking backend. (This was initially shipped with HTTPX.)

When used with HTTPX, this transport makes it easier to transition from Requests to HTTPX by keeping the same underlying HTTP networking layer.

Compatible / tested with: HTTPX 0.27.x

Note: not all urllib3 pool manager options are supported here — feel free to adapt this gist to your specific needs.

Usage

Using HTTPX:

import httpx
from urllib3_transport import URLLib3Transport

with httpx.Client(transport=URLLib3Transport()) as client:
    response = client.get("https://example.org")
    print(response)

If you want to pass a custom ssl_context using the same options than HTTPX (verify, cert, trust_env), use the httpx.create_ssl_context() helper:

import httpx
from urllib3_transport import URLLib3Transport

ssl_context = httpx.create_ssl_context(verify="/tmp/client.pem")

with httpx.Client(transport=URLLib3Transport(ssl_context=ssl_context)) as client:
    response = client.get("https://example.org")
    print(response)

See also Changing the verification defaults in the HTTPX docs.

License

MIT License

Copyright (c) 2020 Florimond Manca

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import typing
import httpx
import urllib3
from httpx._config import create_ssl_context
from httpx._types import CertTypes, ProxyTypes, SyncByteStream, VerifyTypes
def httpx_headers_to_urllib3_headers(headers: httpx.Headers) -> urllib3.HTTPHeaderDict:
urllib3_headers = urllib3.HTTPHeaderDict()
for name, value in headers.multi_items():
urllib3_headers.add(name, value)
return urllib3_headers
class ResponseStream(SyncByteStream):
CHUNK_SIZE = 1024
def __init__(self, urllib3_stream: typing.Any) -> None:
self._urllib3_stream = urllib3_stream
def __iter__(self) -> typing.Iterator[bytes]:
for chunk in self._urllib3_stream.stream(self.CHUNK_SIZE, decode_content=False):
yield chunk
def close(self) -> None:
self._urllib3_stream.release_conn()
class Urllib3Transport(httpx.BaseTransport):
def __init__(
self,
verify: VerifyTypes = True,
trust_env: bool = True,
max_pools: int = 10,
maxsize: int = 10,
cert: CertTypes | None = None,
proxy: ProxyTypes | None = None,
) -> None:
ssl_context = create_ssl_context(cert=cert, verify=verify, trust_env=trust_env)
proxy = httpx.Proxy(url=proxy) if isinstance(proxy, (str, httpx.URL)) else proxy
if proxy is None:
self._pool = urllib3.PoolManager(
ssl_context=ssl_context,
num_pools=max_pools,
maxsize=maxsize,
block=False,
)
elif proxy.url.scheme in ("http", "https"):
self._pool = urllib3.ProxyManager(
str(proxy.url.origin),
num_pools=max_pools,
maxsize=maxsize,
block=False,
proxy_ssl_context=proxy.ssl_context,
proxy_headers=httpx_headers_to_urllib3_headers(proxy.headers),
ssl_context=ssl_context,
)
elif proxy.url.scheme == "socks5":
from urllib3.contrib.socks import SOCKSProxyManager
username, password = proxy.auth or (None, None)
self._pool = SOCKSProxyManager(
proxy_url=str(proxy.url),
num_pools=max_pools,
maxsize=maxsize,
block=False,
username=username,
password=password,
)
else: # pragma: no cover
raise ValueError(
"Proxy protocol must be either 'http', 'https', or 'socks5'," f" but got {proxy.url.scheme!r}."
)
def handle_request(self, request: httpx.Request) -> httpx.Response:
timeouts = request.extensions.get("timeout", {})
connect_timeout = timeouts.get("connect", None)
read_timeout = timeouts.get("read", None)
urllib3_timeout = urllib3.Timeout(
connect=connect_timeout,
read=read_timeout,
)
response = self._pool.request(
request.method,
str(request.url),
body=request.content,
headers=httpx_headers_to_urllib3_headers(request.headers),
redirect=False,
preload_content=False,
timeout=urllib3_timeout,
)
return httpx.Response(
status_code=response.status,
headers=httpx.Headers([(name, value) for name, value in response.headers.iteritems()]),
content=ResponseStream(response),
extensions={"urllib3_response": response},
)
@psymbio
Copy link

psymbio commented Dec 8, 2023

How can this be updated for the newest version of httpcore (v1.0.2)?

@florimondmanca
Copy link
Author

@psymbio Hi, I haven't followed the recent HTTPCore development, do you know what changes, what replacement there might be for this byte stream API?

@psymbio
Copy link

psymbio commented Dec 8, 2023

Currently, looking at this - I'll get back.

@psymbio
Copy link

psymbio commented Dec 8, 2023

I'm pretty new to all this but here's my take on it, I hope you don't mind if I develop this iteratively.

I think, somewhere between the old and the new version _sync in v0.12.x got split into HTTP11 and HTTP2 and so the new implementation of URLLib3ByteStream would look something like:

from typing import Dict, Iterator, List, Optional, Tuple
import logging

from httpcore._sync.http11 import HTTP11ConnectionByteStream, HTTP11Connection
from httpcore._models import Request
from httpcore._trace import Trace
from httpcore._synchronization import ShieldCancellation

logger = logging.getLogger("httpcore.http2")

class URLLib3ByteStream(HTTP11ConnectionByteStream):
    def __init__(self, connection: HTTP11Connection, request: Request) -> None:
        self._connection = connection
        self._request = request
        self._closed = False

    def __iter__(self) -> Iterator[bytes]:
        kwargs = {"request": self._request}
        try:
            with Trace("receive_response_body", logger, self._request, kwargs):
                for chunk in self._connection._receive_response_body(**kwargs):
                    yield chunk
        except BaseException as exc:
            # If we get an exception while streaming the response,
            # we want to close the response (and possibly the connection)
            # before raising that exception.
            with ShieldCancellation():
                self.close()
            raise exc

    def close(self) -> None:
        if not self._closed:
            self._closed = True
            with Trace("response_closed", logger, self._request):
                self._connection._response_closed()

with HTTP1.1, (should we do this for HTTP2 or both?). However, instead of just streaming the response simply, it takes the connection and request to process. Will this work with the current structure where URLLib3Transport is actually used to request?

For httpcore.SyncHTTPTransport it is probably ConnectionInterface here: https://github.com/encode/httpcore/blob/master/httpcore/_sync/interfaces.py

@tomchristie
Copy link

@psymbio Ah great, it'd be wonderful to get a URLLib3 backed transport. Yeah this one is out-of-date.

Let's see if I can help put you on the right track here...

You don't want to be looking at httpcore at all here.
The transport API is defined entirely within httpx.

To start with take a look at https://www.python-httpx.org/advanced/#writing-custom-transports

I'd suggest you start by attempting a custom transport that uses urllib3 and returns a non-streaming response.

import httpx
import urllib3

class URLLib3Transport(httpx.BaseTransport):
    def __init__(self):
        self.pool = urllib3.PoolManager()

    def handle_request(self, request):
        # Make a request using the `urllib3` connection pool.
        urllib3_response = self.pool.request(...)

        # Coerce the returned instance into an `httpx.Response`.
        return httpx.Response(...)

That'll be a good starting point to orientate yourself from.
We can work forward from there.

@tomchristie
Copy link

Linking to the associated discussion... encode/httpx#2994

@Dao-GPT
Copy link

Dao-GPT commented Mar 10, 2024

@psymbio What's the progress? I am searching for the same solution.

@psymbio
Copy link

psymbio commented Mar 11, 2024

@Dao-GPT solution in the linked discussion: encode/httpx#2994

@karpetrosyan
Copy link

I will try to recreate the urllib3 transport; I think it can be useful in some cases.
Something like this will support timeouts, proxies, and response streaming.

import typing

import httpx
import urllib3
from httpx._config import create_ssl_context
from httpx._types import CertTypes, ProxyTypes, SyncByteStream, VerifyTypes


def httpx_headers_to_urllib3_headers(headers: httpx.Headers) -> urllib3.HTTPHeaderDict:
    urllib3_headers = urllib3.HTTPHeaderDict()
    for name, value in headers.multi_items():
        urllib3_headers.add(name, value)
    return urllib3_headers


class ResponseStream(SyncByteStream):
    CHUNK_SIZE = 1024

    def __init__(self, urllib3_stream: typing.Any) -> None:
        self._urllib3_stream = urllib3_stream

    def __iter__(self) -> typing.Iterator[bytes]:
        for chunk in self._urllib3_stream.stream(self.CHUNK_SIZE):
            yield chunk

    def close(self) -> None:
        self._urllib3_stream.release_conn()


class Urllib3Transport(httpx.BaseTransport):
    def __init__(
        self,
        verify: VerifyTypes = True,
        trust_env: bool = True,
        max_pools: int = 10,
        maxsize: int = 10,
        cert: CertTypes | None = None,
        proxy: ProxyTypes | None = None,
    ) -> None:
        ssl_context = create_ssl_context(cert=cert, verify=verify, trust_env=trust_env)
        proxy = httpx.Proxy(url=proxy) if isinstance(proxy, (str, httpx.URL)) else proxy

        if proxy is None:
            self._pool = urllib3.PoolManager(
                ssl_context=ssl_context,
                num_pools=max_pools,
                maxsize=maxsize,
                block=False,
            )
        elif proxy.url.scheme in ("http", "https"):
            self._pool = urllib3.ProxyManager(
                str(proxy.url.origin),
                num_pools=max_pools,
                maxsize=maxsize,
                block=False,
                proxy_ssl_context=proxy.ssl_context,
                proxy_headers=httpx_headers_to_urllib3_headers(proxy.headers),
                ssl_context=ssl_context,
            )
        elif proxy.url.scheme == "socks5":
            from urllib3.contrib.socks import SOCKSProxyManager

            username, password = proxy.auth or (None, None)

            self._pool = SOCKSProxyManager(
                proxy_url=str(proxy.url),
                num_pools=max_pools,
                maxsize=maxsize,
                block=False,
                username=username,
                password=password,
            )
        else:  # pragma: no cover
            raise ValueError(
                "Proxy protocol must be either 'http', 'https', or 'socks5'," f" but got {proxy.url.scheme!r}."
            )

    def handle_request(self, request: httpx.Request) -> httpx.Response:
        timeouts = request.extensions.get("timeout", {})

        connect_timeout = timeouts.get("connect", None)
        read_timeout = timeouts.get("read", None)

        urllib3_timeout = urllib3.Timeout(
            connect=connect_timeout,
            read=read_timeout,
        )

        response = self._pool.request(
            request.method,
            str(request.url),
            body=request.content,
            headers=httpx_headers_to_urllib3_headers(request.headers),
            redirect=False,
            preload_content=False,
            timeout=urllib3_timeout,
        )

        return httpx.Response(
            status_code=response.status,
            headers=httpx.Headers([(name, value) for name, value in response.headers.iteritems()]),
            content=ResponseStream(response),
            extensions={"urllib3_response": response},
        )

@TimothyHeal
Copy link

Thank you so much.

@florimondmanca
Copy link
Author

Thanks @karpetrosyan, I've updated the gist header to point to your up to date solution. 👍

@Dao-GPT
Copy link

Dao-GPT commented Jul 15, 2024

@karpetrosyan
I tested your solution in Pyodide with

with httpx.Client(transport=Urllib3Transport()) as client:
    response = client.get("https://example.org")
    print(response)

But an issue happened as the following:
Access to fetch at 'https://example.org/' from origin 'https://jupyterlite-pyodide-kernel.readthedocs.io' has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource. If an opaque response serves your needs, set the request's mode to 'no-cors' to fetch the resource with CORS disabled.

Please clarify.
Thanks.

@florimondmanca
Copy link
Author

florimondmanca commented Jul 15, 2024

@Dao-GPT Are you able to request other URLs, without Pyiodide?

I was able to fetch example.org, although with an issue on gzip encoding (I had to enforce Accept-Encoding: text/plain in the request, may raise an issue on the HTTPX repo for this). Edit: can't reproduce w/o the urllib3 transport, so it's a potential pitfall from this updated solution. Edit: looks like decode_content=False when calling urllib3_stream.stream() solves it. I will update my gist.

  File "/home/florimond/.pyenv/versions/3.12.1/lib/python3.12/site-packages/httpx/_models.py", line 830, in iter_bytes
    decoded = decoder.decode(raw_bytes)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/florimond/.pyenv/versions/3.12.1/lib/python3.12/site-packages/httpx/_decoders.py", line 80, in decode
    raise DecodingError(str(exc)) from exc
httpx.DecodingError: Error -3 while decompressing data: incorrect header check

Your error seems like a specific issue with how Pyodide / Jupyter operates... If you're making a request through a browser, CORS checks apply which could explain the error. Try applying the Mode: no-corsheader, or look for the keywords "no-cors" on a search engine. Eg [this post](https://medium.com/@cybersphere/fetch-api-the-ultimate-guide-to-cors-and-no-cors-cbcef88d371e.

@Dao-GPT
Copy link

Dao-GPT commented Jul 16, 2024

@florimondmanca On my local laptop, the same issue as yours happened with the original code.

Then following your suggestion I added "decode_content=False" in urllib3_stream.stream() . The code works.

I will try to fix the issue specific to Pyodide.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment