Skip to content

Instantly share code, notes, and snippets.

@florimondmanca
Last active March 11, 2024 10:12
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save florimondmanca/d56764d78d748eb9f73165da388e546e to your computer and use it in GitHub Desktop.
Save florimondmanca/d56764d78d748eb9f73165da388e546e to your computer and use it in GitHub Desktop.
URLLib3 transport implementation (Extracted from HTTPX)

urllib3-transport

An HTTPCore transport that uses urllib3 as the HTTP networking backend. (This was initially shipped with HTTPX.)

When used with HTTPX, this transport makes it easier to transition from Requests to HTTPX by keeping the same underlying HTTP networking layer.

Compatible with: HTTPX 0.15.x, 0.16.x (i.e. HTTPCore 0.11.x and HTTPCore 0.12.x).

Note: not all urllib3 pool manager options are supported here — feel free to adapt this gist to your specific needs.

Usage

Using HTTPX:

import httpx
from urllib3_transport import URLLib3Transport

with httpx.Client(transport=URLLib3Transport()) as client:
    response = client.get("https://example.org")
    print(response)

If you want to pass a custom ssl_context using the same options than HTTPX (verify, cert, trust_env), use the httpx.create_ssl_context() helper:

import httpx
from urllib3_transport import URLLib3Transport

ssl_context = httpx.create_ssl_context(verify="/tmp/client.pem")

with httpx.Client(transport=URLLib3Transport(ssl_context=ssl_context)) as client:
    response = client.get("https://example.org")
    print(response)

See also Changing the verification defaults in the HTTPX docs.

License

MIT License

Copyright (c) 2020 Florimond Manca

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import socket
import ssl
from typing import Dict, Iterator, List, Optional, Tuple
import httpcore
import urllib3
class URLLib3ByteStream(httpcore.SyncByteStream):
def __init__(self, response: urllib3.HTTPResponse) -> None:
self._response = response
def __iter__(self) -> Iterator[bytes]:
try:
for chunk in self._response.stream(4096, decode_content=False):
yield chunk
except socket.error as exc:
raise httpcore.NetworkError(exc)
def close(self) -> None:
self._response.release_conn()
class URLLib3Transport(httpcore.SyncHTTPTransport):
def __init__(
self,
*,
ssl_context: ssl.SSLContext = None,
pool_connections: int = 10,
pool_maxsize: int = 10,
pool_block: bool = False,
) -> None:
self._pool = urllib3.PoolManager(
ssl_context=ssl_context,
num_pools=pool_connections,
maxsize=pool_maxsize,
block=pool_block,
)
def request(
self,
method: bytes,
url: Tuple[bytes, bytes, Optional[int], bytes],
headers: List[Tuple[bytes, bytes]] = None,
stream: httpcore.SyncByteStream = None,
ext: dict = None,
) -> Tuple[int, List[Tuple[bytes, bytes]], httpcore.SyncByteStream, dict]:
headers = [] if headers is None else headers
stream = httpcore.PlainByteStream(b"") if stream is None else stream
ext = {} if ext is None else ext
timeout: Dict[str, float] = ext["timeout"]
urllib3_timeout = urllib3.util.Timeout(
connect=timeout.get("connect"), read=timeout.get("read")
)
chunked = False
content_length = 0
for header_key, header_value in headers:
header_key = header_key.lower()
if header_key == b"transfer-encoding":
chunked = header_value == b"chunked"
if header_key == b"content-length":
content_length = int(header_value.decode("ascii"))
body = stream if chunked or content_length else None
scheme, host, port, path = url
default_port = {b"http": 80, "https": 443}.get(scheme)
if port is None or port == default_port:
url_str = "%s://%s%s" % (
scheme.decode("ascii"),
host.decode("ascii"),
path.decode("ascii"),
)
else:
url_str = "%s://%s:%d%s" % (
scheme.decode("ascii"),
host.decode("ascii"),
port,
path.decode("ascii"),
)
try:
response = self._pool.urlopen(
method=method.decode(),
url=url_str,
headers={
key.decode("ascii"): value.decode("ascii") for key, value in headers
},
body=body,
redirect=False,
assert_same_host=False,
retries=0,
preload_content=False,
chunked=chunked,
timeout=urllib3_timeout,
pool_timeout=timeout.get("pool"),
)
except (urllib3.exceptions.SSLError, socket.error) as exc:
raise httpcore.NetworkError(exc)
status_code = response.status
reason_phrase = response.reason
headers = list(response.headers.items())
stream = URLLib3ByteStream(response)
ext = {"reason": reason_phrase, "http_version": "HTTP/1.1"}
return (status_code, headers, stream, ext)
def close(self) -> None:
self._pool.clear()
class URLLib3ProxyTransport(URLLib3Transport):
def __init__(
self,
*,
proxy_url: str,
proxy_headers: dict = None,
ssl_context: ssl.SSLContext = None,
pool_connections: int = 10,
pool_maxsize: int = 10,
pool_block: bool = False,
) -> None:
self._pool = urllib3.ProxyManager(
proxy_url=proxy_url,
proxy_headers=proxy_headers,
ssl_context=ssl_context,
num_pools=pool_connections,
maxsize=pool_maxsize,
block=pool_block,
)
@psymbio
Copy link

psymbio commented Dec 8, 2023

How can this be updated for the newest version of httpcore (v1.0.2)?

@florimondmanca
Copy link
Author

@psymbio Hi, I haven't followed the recent HTTPCore development, do you know what changes, what replacement there might be for this byte stream API?

@psymbio
Copy link

psymbio commented Dec 8, 2023

Currently, looking at this - I'll get back.

@psymbio
Copy link

psymbio commented Dec 8, 2023

I'm pretty new to all this but here's my take on it, I hope you don't mind if I develop this iteratively.

I think, somewhere between the old and the new version _sync in v0.12.x got split into HTTP11 and HTTP2 and so the new implementation of URLLib3ByteStream would look something like:

from typing import Dict, Iterator, List, Optional, Tuple
import logging

from httpcore._sync.http11 import HTTP11ConnectionByteStream, HTTP11Connection
from httpcore._models import Request
from httpcore._trace import Trace
from httpcore._synchronization import ShieldCancellation

logger = logging.getLogger("httpcore.http2")

class URLLib3ByteStream(HTTP11ConnectionByteStream):
    def __init__(self, connection: HTTP11Connection, request: Request) -> None:
        self._connection = connection
        self._request = request
        self._closed = False

    def __iter__(self) -> Iterator[bytes]:
        kwargs = {"request": self._request}
        try:
            with Trace("receive_response_body", logger, self._request, kwargs):
                for chunk in self._connection._receive_response_body(**kwargs):
                    yield chunk
        except BaseException as exc:
            # If we get an exception while streaming the response,
            # we want to close the response (and possibly the connection)
            # before raising that exception.
            with ShieldCancellation():
                self.close()
            raise exc

    def close(self) -> None:
        if not self._closed:
            self._closed = True
            with Trace("response_closed", logger, self._request):
                self._connection._response_closed()

with HTTP1.1, (should we do this for HTTP2 or both?). However, instead of just streaming the response simply, it takes the connection and request to process. Will this work with the current structure where URLLib3Transport is actually used to request?

For httpcore.SyncHTTPTransport it is probably ConnectionInterface here: https://github.com/encode/httpcore/blob/master/httpcore/_sync/interfaces.py

@tomchristie
Copy link

@psymbio Ah great, it'd be wonderful to get a URLLib3 backed transport. Yeah this one is out-of-date.

Let's see if I can help put you on the right track here...

You don't want to be looking at httpcore at all here.
The transport API is defined entirely within httpx.

To start with take a look at https://www.python-httpx.org/advanced/#writing-custom-transports

I'd suggest you start by attempting a custom transport that uses urllib3 and returns a non-streaming response.

import httpx
import urllib3

class URLLib3Transport(httpx.BaseTransport):
    def __init__(self):
        self.pool = urllib3.PoolManager()

    def handle_request(self, request):
        # Make a request using the `urllib3` connection pool.
        urllib3_response = self.pool.request(...)

        # Coerce the returned instance into an `httpx.Response`.
        return httpx.Response(...)

That'll be a good starting point to orientate yourself from.
We can work forward from there.

@tomchristie
Copy link

Linking to the associated discussion... encode/httpx#2994

@Dao-GPT
Copy link

Dao-GPT commented Mar 10, 2024

@psymbio What's the progress? I am searching for the same solution.

@psymbio
Copy link

psymbio commented Mar 11, 2024

@Dao-GPT solution in the linked discussion: encode/httpx#2994

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment