Skip to content

Instantly share code, notes, and snippets.

@spchamp
Last active September 18, 2023 22:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spchamp/f9c66c2b894714020bb8ebd156770c48 to your computer and use it in GitHub Desktop.
Save spchamp/f9c66c2b894714020bb8ebd156770c48 to your computer and use it in GitHub Desktop.
Callback-based I/O with HTTPX
## aiohttpx.py - callback-based HTTP client example
## when run, displays debugging information with a decoded
## JSON response from http://ip.jsontest.com
## requirements
## - Python 3 https://www.python.org/downloads/
## - HTTPX https://www.python-httpx.org/async/
from abc import abstractmethod, ABC
from dataclasses import dataclass
from typing import Callable, Literal, Mapping, Optional
from typing_extensions import Generic, TypeVar
import asyncio as aio
import httpx
from httpx._client import BaseClient
import json
from urllib import parse
Ti = TypeVar("Ti")
To = TypeVar("To")
T = TypeVar('T')
class DataSource(aio.Future[To], ABC):
def __init__(self, loop=None, name=None, context=None):
super().__init__(loop=loop)
pass
def __hash__(self):
return object.__hash__(self)
@abstractmethod
async def process(self) -> To:
raise NotImplementedError(self.process)
async def process_result(self) -> To:
if self.done():
return self.result()
else:
try:
self.set_result(await self.process())
return self.result()
except aio.InvalidStateError:
return self.result()
except Exception as exc:
if not self.cancelled():
try:
## FIXME also provide support for storing at traceback here
self.set_exception(exc)
except Exception as exc:
pass
raise
def __await__(self) : # -> Literal[Empty.tuple]:
return self.process_result().__await__()
def schedule(self) -> aio.Task:
return self.get_loop().create_task(self.process_result())
def __repr__(self):
return "<%s at 0x%x>" % (self.__class__.__name__, id(self))
def __str__(self):
return repr(self)
class FutureChain(DataSource[To], Generic[Ti, To]):
# Implementation Note
# process() must be implemented in derived classes
@property
def in_future(self) -> DataSource[Ti]:
return self._in_future
def __init__(self, in_future: DataSource[Ti], loop=None, *, defer=False):
super().__init__(loop)
setattr(self, "_in_future", in_future)
if not defer:
self.ensure_callbacks()
def ensure_callbacks(self):
self.in_future.add_done_callback(lambda ftr: ftr.get_loop().create_task(self.process()))
#
# Example Application :
# Callback-based Request, Decoding, and Presentation
#
Tr = TypeVar('Tr', bound=BaseClient)
class RequestBroker(Generic[Tr]):
method: str
url: str
def __init__(self, method: str, url: str):
self.method = method
self.url = url
@abstractmethod
async def dispatch_request(self) -> Tr:
raise NotImplementedError(self.dispatch_request)
class BytesRequestSource(DataSource[httpx.Response], RequestBroker[bytes]):
default_encoding: Optional[str]
def __init__(self, method: str, url: str,
# default_encoding: str = "utf-8",
default_encoding: Optional[str] = None,
loop=None):
DataSource.__init__(self, loop)
RequestBroker.__init__(self, method, url)
self.default_encoding = default_encoding
async def dispatch_request(self) -> httpx.Response:
async with httpx.AsyncClient() as client:
client_mtd: Callable[..., httpx.Response] = getattr(client, self.method.lower())
if __debug__:
print("dispatch %r" % client_mtd)
return await client_mtd(self.url)
async def process(self) -> bytes:
if __debug__:
print("process() for %r" % self)
response = await self.dispatch_request()
if __debug__:
print("process() for %r => response %r" % (self, response))
try:
## FIXME if response content type is not application/json,
## do not parse as json. Proxy servers may introduce an
## error page in HTML format.
data = b''
async for chunk in response.aiter_bytes():
data += chunk
encoding = response.charset_encoding or self.default_encoding
if __debug__:
print("done: %r" % self)
if encoding:
return data.decode(encoding)
else:
return data.decode()
finally:
await response.aclose()
class ParseChain(FutureChain[bytes, Mapping]):
async def process(self) -> Mapping:
if __debug__:
print("process() for %r" % self)
rslt = await self.in_future.process_result()
if __debug__:
print("parsing %r" % rslt)
obj = json.loads(rslt)
if __debug__:
print("parsed %r" % obj)
return obj
class PresentationChain(FutureChain[Mapping, None]):
async def process(self) -> None:
if __debug__:
print("process() for %r" % self)
rslt = await self.in_future.process_result()
print("Response: %s" % str(rslt))
return True
async def run_example(mtd, req):
loop = aio.get_running_loop()
bytesource = BytesRequestSource(method=mtd, url=req, loop=loop)
parser = ParseChain(bytesource, loop=loop)
presenter = PresentationChain(parser, loop=loop)
if __debug__:
print("running request")
## works
# scheduled: aio.Task = presenter.schedule()
# await scheduled
## also works:
# scheduled: aio.Task = parser.schedule()
# await scheduled
## works:
# await parser
## also works:
await presenter
print("Bytesource => %r" % await bytesource.process_result())
print("Parser => %r" % await parser.process_result())
print("Presenter => %r" % await presenter.process_result())
if __name__ == "__main__":
loop = aio.get_event_loop_policy().get_event_loop()
loop.run_until_complete(run_example("get", "http://ip.jsontest.com"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment