Skip to content

Instantly share code, notes, and snippets.

@tomchristie
Created Jan 15, 2020
Embed
What would you like to do?
An ASGI proxy service.
import httpx
from starlette.requests import Request
from starlette.responses import StreamingResponse
class Proxy:
def __init__(self, hostname):
self.hostname = hostname
self.client = httpx.AsyncClient()
async def __call__(self, scope, receive, send):
assert scope['type'] == 'http'
request = Request(scope, receive=receive)
async with self.client.stream(
method=self.get_method(request),
url=self.get_url(request),
headers=self.get_headers(request),
data=self.get_body(request),
allow_redirects=False
) as response:
app = StreamingResponse(
status_code=response.status_code,
headers=response.headers,
content=response.aiter_raw()
)
await app(scope, receive, send)
def get_method(self, request):
return request.method
def get_url(self, request):
return str(request.url.replace(scheme="https", netloc=self.hostname))
def get_headers(self, request):
return [(key, value) for key, value in request.headers.raw if key != b'host']
def get_body(self, request):
if 'content-length' in request.headers or 'transfer-encoding' in request.headers:
return request.stream()
return None
app = Proxy(hostname="www.google.com")
@tomchristie

This comment has been minimized.

Copy link
Owner Author

@tomchristie tomchristie commented Jan 15, 2020

Was also interested in seeing what it'd look like if we had resources that we wanted to run strictly inside an async with context managed block...

    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            request = Request(scope, receive=receive)
            async with self.client.stream(
                method=self.get_method(request),
                url=self.get_url(request),
                headers=self.get_headers(request),
                data=self.get_body(request),
                allow_redirects=False
            ) as response:
                app = StreamingResponse(
                    status_code=response.status_code,
                    headers=response.headers,
                    content=response.aiter_raw()
                )
                await app(scope, receive, send)

        elif scope['type'] == 'lifespan':
            message = await receive()
            assert message['type'] == 'lifespan.startup'

            async with httpx.AsyncClient() as self.client:
                await send({'type': 'lifespan.startup.complete'})
                message = await receive()

            assert message['type'] == 'lifespan.complete'
            await send({'type': 'lifespan.shutdown.complete'})
@dhirschfeld

This comment has been minimized.

Copy link

@dhirschfeld dhirschfeld commented Jun 1, 2020

Hi @tomchristie,
@pgjones pointed me to this very useful snippit so thanks for posting it! It seems to work great for http requests but the app I'm trying to proxy (a dask dashboard) also uses websockets. I figure I have to also implement something with the Websocket class but I'm a bit sketchy on the exact details of how to do that. If you have time to give some pointers that would be very much appreciated, otherwise, I'll post my solution here if I manage to get it working...

@dhirschfeld

This comment has been minimized.

Copy link

@dhirschfeld dhirschfeld commented Jun 3, 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment