Skip to content

Instantly share code, notes, and snippets.

@tomchristie
Created January 15, 2020 13:06
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomchristie/5765e10a90a41c7e57470e2dc700f9db to your computer and use it in GitHub Desktop.
Save tomchristie/5765e10a90a41c7e57470e2dc700f9db to your computer and use it in GitHub Desktop.
An ASGI proxy service.
import httpx
from starlette.requests import Request
from starlette.responses import StreamingResponse
class Proxy:
def __init__(self, hostname):
self.hostname = hostname
self.client = httpx.AsyncClient()
async def __call__(self, scope, receive, send):
assert scope['type'] == 'http'
request = Request(scope, receive=receive)
async with self.client.stream(
method=self.get_method(request),
url=self.get_url(request),
headers=self.get_headers(request),
data=self.get_body(request),
allow_redirects=False
) as response:
app = StreamingResponse(
status_code=response.status_code,
headers=response.headers,
content=response.aiter_raw()
)
await app(scope, receive, send)
def get_method(self, request):
return request.method
def get_url(self, request):
return str(request.url.replace(scheme="https", netloc=self.hostname))
def get_headers(self, request):
return [(key, value) for key, value in request.headers.raw if key != b'host']
def get_body(self, request):
if 'content-length' in request.headers or 'transfer-encoding' in request.headers:
return request.stream()
return None
app = Proxy(hostname="www.google.com")
@tomchristie
Copy link
Author

Was also interested in seeing what it'd look like if we had resources that we wanted to run strictly inside an async with context managed block...

    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            request = Request(scope, receive=receive)
            async with self.client.stream(
                method=self.get_method(request),
                url=self.get_url(request),
                headers=self.get_headers(request),
                data=self.get_body(request),
                allow_redirects=False
            ) as response:
                app = StreamingResponse(
                    status_code=response.status_code,
                    headers=response.headers,
                    content=response.aiter_raw()
                )
                await app(scope, receive, send)

        elif scope['type'] == 'lifespan':
            message = await receive()
            assert message['type'] == 'lifespan.startup'

            async with httpx.AsyncClient() as self.client:
                await send({'type': 'lifespan.startup.complete'})
                message = await receive()

            assert message['type'] == 'lifespan.complete'
            await send({'type': 'lifespan.shutdown.complete'})

@dhirschfeld
Copy link

Hi @tomchristie,
@pgjones pointed me to this very useful snippit so thanks for posting it! It seems to work great for http requests but the app I'm trying to proxy (a dask dashboard) also uses websockets. I figure I have to also implement something with the Websocket class but I'm a bit sketchy on the exact details of how to do that. If you have time to give some pointers that would be very much appreciated, otherwise, I'll post my solution here if I manage to get it working...

@dhirschfeld
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment