Skip to content

Instantly share code, notes, and snippets.

Last active April 29, 2024 12:36
Show Gist options
  • Save richardhundt/17dfccb5c1e253f798999fc2b2417d7e to your computer and use it in GitHub Desktop.
Save richardhundt/17dfccb5c1e253f798999fc2b2417d7e to your computer and use it in GitHub Desktop.
Patch httpx ASGITransport to stream responses for testing
import asyncio
from contextlib import asynccontextmanager
import typing
import asyncio
from httpx._models import Request, Response
from httpx._transports.asgi import ASGITransport
from httpx._types import AsyncByteStream
class ASGIResponseByteStream(AsyncByteStream):
def __init__(
self, stream: typing.AsyncGenerator[bytes, None]
) -> None:
self._stream = stream
def __aiter__(self) -> typing.AsyncIterator[bytes]:
return self._stream.__aiter__()
async def aclose(self) -> None:
await self._stream.aclose()
async def patch_handle_async_request(
self: ASGITransport,
request: Request,
) -> Response:
assert isinstance(, AsyncByteStream)
# ASGI scope.
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"method": request.method,
"headers": [(k.lower(), v) for (k, v) in request.headers.raw],
"scheme": request.url.scheme,
"path": request.url.path,
"raw_path": request.url.raw_path,
"query_string": request.url.query,
"server": (, request.url.port),
"client": self.client,
"root_path": self.root_path,
# Request.
request_body_chunks =
request_complete = False
# Response.
status_code = None
response_headers = None
sentinel = object()
body_queue = asyncio.Queue()
response_started = asyncio.Event()
response_complete = asyncio.Event()
# ASGI callables.
async def receive() -> typing.Dict[str, typing.Any]:
nonlocal request_complete
if request_complete:
await response_complete.wait()
return {"type": "http.disconnect"}
body = await request_body_chunks.__anext__()
except StopAsyncIteration:
request_complete = True
return {"type": "http.request", "body": b"", "more_body": False}
return {"type": "http.request", "body": body, "more_body": True}
async def send(message: typing.Dict[str, typing.Any]) -> None:
nonlocal status_code, response_headers, response_started
if message["type"] == "http.response.start":
assert not response_started.is_set()
status_code = message["status"]
response_headers = message.get("headers", [])
elif message["type"] == "http.response.body":
assert response_started.is_set()
assert not response_complete.is_set()
body = message.get("body", b"")
more_body = message.get("more_body", False)
if body and request.method != "HEAD":
await body_queue.put(body)
if not more_body:
await body_queue.put(sentinel)
async def run_app() -> None:
await, receive, send)
except Exception: # noqa: PIE-786
if self.raise_app_exceptions or not response_complete.is_set():
async def body_stream() -> typing.AsyncGenerator[bytes, None]:
while True:
body = await body_queue.get()
if body != sentinel:
yield body
await response_started.wait()
assert status_code is not None
assert response_headers is not None
stream = ASGIResponseByteStream(body_stream())
return Response(status_code, headers=response_headers, stream=stream)
async def patch_asgi_transport():
restore = ASGITransport.handle_async_request
ASGITransport.handle_async_request = patch_handle_async_request
ASGITransport.handle_async_request = restore
Copy link

richardhundt commented Jul 28, 2023

Example usage:

async def test_client(event_loop, test_app):
    from tests.patch_httpx_asgi import patch_asgi_transport
    async with patch_asgi_transport():
        async with AsyncClient(base_url="http://testserver", app=test_app) as async_client:
            yield async_client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment