Skip to content

Instantly share code, notes, and snippets.

@spchamp
Last active September 21, 2023 18:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spchamp/f6d6f37a8212918cfa66896a0e79c082 to your computer and use it in GitHub Desktop.
Save spchamp/f6d6f37a8212918cfa66896a0e79c082 to your computer and use it in GitHub Desktop.
async HTTP client produces segfault on Windows, "Windows fatal exception: access violation"
## async_httpx.py - callback-based HTTP client example
## (Public Domain)
from abc import abstractmethod, ABC
from typing import Mapping, Optional
from typing_extensions import Generic, TypeVar
import asyncio as aio
import httpx
from httpx._client import BaseClient
import json
from urllib import parse
Ti = TypeVar("Ti")
To = TypeVar("To")
class FutureData(aio.Future[To], ABC):
"""Data Source for a FutureProcess"""
def __init__(self, *, loop=None, name=None, context=None):
super().__init__(loop=loop)
pass
@abstractmethod
async def run(self) -> To:
raise NotImplementedError(self.run)
async def process(self) -> To:
if self.done():
return self.result()
else:
try:
self.set_result(await self.run())
return self.result()
except aio.InvalidStateError:
return self.result()
except Exception as exc:
if not self.done():
try:
self.set_exception(exc)
except Exception as exc:
pass
raise
def __await__(self):
return self.process().__await__()
def schedule(self) -> aio.Task:
return self.get_loop().create_task(self.process())
def __repr__(self):
return "<%s at 0x%x>" % (self.__class__.__name__, id(self))
def __str__(self):
return repr(self)
class FutureProcess(FutureData[To], Generic[Ti, To]):
# Implementation Note
# run() must be implemented in derived classes
# process() should be called as the result accessor
@property
def input_source(self) -> FutureData[Ti]:
return self._input_source
def __init__(self, input_source: FutureData[Ti], loop=None, *, defer=False):
super().__init__(loop=loop)
self._input_source = input_source
if not defer:
self.ensure_callbacks()
def ensure_callbacks(self):
self.input_source.add_done_callback(lambda future: future.get_loop().create_task(self.process()))
#
# Example Application :
# Callback-based Request, Decoding, and Presentation
#
Tr = TypeVar('Tr', bound=BaseClient)
class RequestBroker(FutureData[str], Generic[Tr]):
method: str
url: str
default_encoding: Optional[str]
def __init__(self, *, method: str, url: str,
default_encoding: Optional[str] = None,
loop=None):
super().__init__(loop=loop)
self.method = method
self.url = url
self.default_encoding = default_encoding
@abstractmethod
async def dispatch_request(self) -> Tr:
raise NotImplementedError(self.dispatch_request)
async def run(self) -> bytes:
if __debug__:
print("process() for %r" % self)
async for response in self.dispatch_request():
if __debug__:
print("process() for %r => response %r" % (self, response))
headers = response.headers
content_type = headers["Content-Type"] if "Content-Type" in headers else None
if content_type and content_type != "application/json":
## a proxy server may introduce an HTML error response page
raise RuntimeError("Unsupported content type in response", content_type)
data = ''
encoding = response.charset_encoding or self.default_encoding
async for chunk in response.aiter_bytes():
if encoding:
data += chunk.decode(encoding)
else:
data += chunk.decode()
if __debug__:
print("done: %r" % self)
return data
class TextRequestSource(RequestBroker[httpx.AsyncClient]):
async def dispatch_request(self) -> httpx.Response:
client = httpx.AsyncClient()
if __debug__:
print("dispatch %r" % self.method)
response = None
try:
response = await client.request(self.method, self.url)
yield response
finally:
if response:
await response.aclose()
await client.aclose()
class TextStreamSource(RequestBroker[httpx.AsyncClient]):
async def dispatch_request(self) -> httpx.Response:
try:
client = httpx.AsyncClient(http2=True)
if __debug__:
print("dispatch %r" % self.method)
async with client.stream(self.method, self.url) as response:
try:
yield response
finally:
await response.aclose()
finally:
await client.aclose()
class ParserProcess(FutureProcess[str, Mapping]):
async def run(self) -> Mapping:
if __debug__:
print("process() for %r" % self)
rslt = await self.input_source.process()
if __debug__:
print("parsing %s %r" % (rslt.__class__.__name__, rslt))
obj = json.loads(rslt)
if __debug__:
print("parsed %r" % obj)
return obj
class PresentationProcess(FutureProcess[Mapping, None]):
async def run(self) -> None:
if __debug__:
print("process() for %r" % self)
rslt = await self.input_source.process()
print("Request => %s" % repr(rslt))
return True
async def run_example(mtd, req):
loop = aio.get_running_loop()
## alternatives for dispatch with HTTPX - direct or stream quest
## - stream request requires HTTP/2 support on the server
bytesource = TextRequestSource(method=mtd, url=req, loop=loop)
# bytesource = BytesStreamSource(method=mtd, url=req, loop=loop)
parser = ParserProcess(bytesource, loop=loop)
presenter = PresentationProcess(parser, loop=loop)
if __debug__:
print("running request")
scheduled: aio.Task = parser.schedule()
await scheduled
print("Bytesource => %r" % await bytesource.process())
print("Parser => %r" % await parser.process())
print("Presenter => %r" % await presenter.process())
if __name__ == "__main__":
## workaround for "Windows fatal exception: access violation"
## - using a selector event loop with Python on Windows
# import sys
# loop = aio.SelectorEventLoop() if sys.platform == "win32" else aio.get_event_loop_policy().get_event_loop()
## alternative: activating the segfault on Windows, for purpose of debug
loop = aio.get_event_loop_policy().get_event_loop()
loop.run_until_complete(run_example("get", "http://ip.jsontest.com"))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment