Last active
September 21, 2023 18:51
-
-
Save spchamp/f6d6f37a8212918cfa66896a0e79c082 to your computer and use it in GitHub Desktop.
async HTTP client produces segfault on Windows, "Windows fatal exception: access violation"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## async_httpx.py - callback-based HTTP client example | |
## (Public Domain) | |
from abc import abstractmethod, ABC | |
from typing import Mapping, Optional | |
from typing_extensions import Generic, TypeVar | |
import asyncio as aio | |
import httpx | |
from httpx._client import BaseClient | |
import json | |
from urllib import parse | |
Ti = TypeVar("Ti") | |
To = TypeVar("To") | |
class FutureData(aio.Future[To], ABC): | |
"""Data Source for a FutureProcess""" | |
def __init__(self, *, loop=None, name=None, context=None): | |
super().__init__(loop=loop) | |
pass | |
@abstractmethod | |
async def run(self) -> To: | |
raise NotImplementedError(self.run) | |
async def process(self) -> To: | |
if self.done(): | |
return self.result() | |
else: | |
try: | |
self.set_result(await self.run()) | |
return self.result() | |
except aio.InvalidStateError: | |
return self.result() | |
except Exception as exc: | |
if not self.done(): | |
try: | |
self.set_exception(exc) | |
except Exception as exc: | |
pass | |
raise | |
def __await__(self): | |
return self.process().__await__() | |
def schedule(self) -> aio.Task: | |
return self.get_loop().create_task(self.process()) | |
def __repr__(self): | |
return "<%s at 0x%x>" % (self.__class__.__name__, id(self)) | |
def __str__(self): | |
return repr(self) | |
class FutureProcess(FutureData[To], Generic[Ti, To]): | |
# Implementation Note | |
# run() must be implemented in derived classes | |
# process() should be called as the result accessor | |
@property | |
def input_source(self) -> FutureData[Ti]: | |
return self._input_source | |
def __init__(self, input_source: FutureData[Ti], loop=None, *, defer=False): | |
super().__init__(loop=loop) | |
self._input_source = input_source | |
if not defer: | |
self.ensure_callbacks() | |
def ensure_callbacks(self): | |
self.input_source.add_done_callback(lambda future: future.get_loop().create_task(self.process())) | |
# | |
# Example Application : | |
# Callback-based Request, Decoding, and Presentation | |
# | |
Tr = TypeVar('Tr', bound=BaseClient) | |
class RequestBroker(FutureData[str], Generic[Tr]): | |
method: str | |
url: str | |
default_encoding: Optional[str] | |
def __init__(self, *, method: str, url: str, | |
default_encoding: Optional[str] = None, | |
loop=None): | |
super().__init__(loop=loop) | |
self.method = method | |
self.url = url | |
self.default_encoding = default_encoding | |
@abstractmethod | |
async def dispatch_request(self) -> Tr: | |
raise NotImplementedError(self.dispatch_request) | |
async def run(self) -> bytes: | |
if __debug__: | |
print("process() for %r" % self) | |
async for response in self.dispatch_request(): | |
if __debug__: | |
print("process() for %r => response %r" % (self, response)) | |
headers = response.headers | |
content_type = headers["Content-Type"] if "Content-Type" in headers else None | |
if content_type and content_type != "application/json": | |
## a proxy server may introduce an HTML error response page | |
raise RuntimeError("Unsupported content type in response", content_type) | |
data = '' | |
encoding = response.charset_encoding or self.default_encoding | |
async for chunk in response.aiter_bytes(): | |
if encoding: | |
data += chunk.decode(encoding) | |
else: | |
data += chunk.decode() | |
if __debug__: | |
print("done: %r" % self) | |
return data | |
class TextRequestSource(RequestBroker[httpx.AsyncClient]): | |
async def dispatch_request(self) -> httpx.Response: | |
client = httpx.AsyncClient() | |
if __debug__: | |
print("dispatch %r" % self.method) | |
response = None | |
try: | |
response = await client.request(self.method, self.url) | |
yield response | |
finally: | |
if response: | |
await response.aclose() | |
await client.aclose() | |
class TextStreamSource(RequestBroker[httpx.AsyncClient]): | |
async def dispatch_request(self) -> httpx.Response: | |
try: | |
client = httpx.AsyncClient(http2=True) | |
if __debug__: | |
print("dispatch %r" % self.method) | |
async with client.stream(self.method, self.url) as response: | |
try: | |
yield response | |
finally: | |
await response.aclose() | |
finally: | |
await client.aclose() | |
class ParserProcess(FutureProcess[str, Mapping]): | |
async def run(self) -> Mapping: | |
if __debug__: | |
print("process() for %r" % self) | |
rslt = await self.input_source.process() | |
if __debug__: | |
print("parsing %s %r" % (rslt.__class__.__name__, rslt)) | |
obj = json.loads(rslt) | |
if __debug__: | |
print("parsed %r" % obj) | |
return obj | |
class PresentationProcess(FutureProcess[Mapping, None]): | |
async def run(self) -> None: | |
if __debug__: | |
print("process() for %r" % self) | |
rslt = await self.input_source.process() | |
print("Request => %s" % repr(rslt)) | |
return True | |
async def run_example(mtd, req): | |
loop = aio.get_running_loop() | |
## alternatives for dispatch with HTTPX - direct or stream quest | |
## - stream request requires HTTP/2 support on the server | |
bytesource = TextRequestSource(method=mtd, url=req, loop=loop) | |
# bytesource = BytesStreamSource(method=mtd, url=req, loop=loop) | |
parser = ParserProcess(bytesource, loop=loop) | |
presenter = PresentationProcess(parser, loop=loop) | |
if __debug__: | |
print("running request") | |
scheduled: aio.Task = parser.schedule() | |
await scheduled | |
print("Bytesource => %r" % await bytesource.process()) | |
print("Parser => %r" % await parser.process()) | |
print("Presenter => %r" % await presenter.process()) | |
if __name__ == "__main__": | |
## workaround for "Windows fatal exception: access violation" | |
## - using a selector event loop with Python on Windows | |
# import sys | |
# loop = aio.SelectorEventLoop() if sys.platform == "win32" else aio.get_event_loop_policy().get_event_loop() | |
## alternative: activating the segfault on Windows, for purpose of debug | |
loop = aio.get_event_loop_policy().get_event_loop() | |
loop.run_until_complete(run_example("get", "http://ip.jsontest.com")) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment