-
-
Save xpinguin/e77a4a817aa1b1590de1743fa41f6671 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def to_async_corofunc(sync_func, *, _loop :asyncio.AbstractEventLoop = None, _timeout = None, _iter_protect = True): | |
""" | |
F -> await <loop>.run_in_executor(F) | |
""" | |
if inspect.iscoroutinefunction(sync_func): | |
return sync_func | |
## | |
if (inspect.ismethoddescriptor(sync_func) or inspect.ismethod(sync_func)): | |
_partial = partialmethod | |
elif (callable(sync_func)): | |
_partial = partial | |
else: | |
raise TypeError("sync_func :%s ='%s': is not callable" % (type(sync_func).__name__, sync_func), sync_func) | |
if (_iter_protect): | |
unprotected_sync_func = sync_func | |
@wraps(unprotected_sync_func) | |
def sync_func(*args, **kwargs): | |
try: | |
return unprotected_sync_func(*args, **kwargs) | |
except StopIteration as exc: | |
raise StopAsyncIteration() from exc | |
@wraps(sync_func) | |
async def async_func_wrap(*args, **kwargs): | |
loop = _loop or asyncio.get_event_loop() | |
return await asyncio.wait_for(loop.run_in_executor(None, | |
((sync_func) | |
if (not kwargs) else | |
_partial(sync_func, **kwargs)), | |
*args | |
), _timeout, loop = loop) | |
return async_func_wrap | |
def to_sync_func(async_corofunc, *, _loop = None, _timeout = None): | |
""" | |
coroF -> asyncio.run_coroutine_threadsafe(coroF).<wait>(...) | |
""" | |
@wraps(async_corofunc) | |
def sync_corofunc_wrap(*args, **kwargs): | |
loop = _loop or asyncio.get_event_loop() | |
# TODO/Q: shall we cancel on timeout? (ie. catch asyncio.TimeoutError) | |
return asyncio.run_coroutine_threadsafe( | |
async_corofunc(*args, **kwargs), | |
loop = loop | |
).result(_timeout) | |
return sync_corofunc_wrap |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WSGIApp(with_EventLoop): | |
# ----- | |
DEFAULT_SERVER_PORT = 80 | |
IGNORED_HEADERS = { # Q: or just rely on .HOP_HEADERS somewhere inside aiohttp? | |
hdrs.CONNECTION, | |
hdrs.KEEP_ALIVE, | |
hdrs.PROXY_AUTHENTICATE, | |
hdrs.PROXY_AUTHORIZATION, | |
hdrs.TE, | |
hdrs.TRAILER, | |
hdrs.TRANSFER_ENCODING, | |
hdrs.UPGRADE, | |
} | |
# ----- | |
def __init__(self, app, *, is_async = False, **kwargs): | |
super().__init__(**kwargs) | |
self.wsgi_app = app | |
self.is_async = is_async | |
self._log = logging.getLogger(str(self)) | |
def __str__(self): | |
return "%s(%s)" % (self.__class__.__name__, str(self.wsgi_app)) | |
def create_wsgi_environ(self, req :web.Request, content): | |
"""(adapted from: aiohttp.wsgi.WSGIServerHttpProtocol)""" | |
headers = req.headers | |
server_host_port = headers.get(hdrs.HOST, "<unknown-host>").split(":") | |
if (len(server_host_port) == 1): | |
server_host_port.append(self.DEFAULT_SERVER_PORT) | |
environ = { # TODO: lazy dict | |
"wsgi.input": content, | |
"wsgi.errors": sys.stderr, | |
"wsgi.version": (1, 0), | |
"wsgi.async": self.is_async, | |
"wsgi.multithread": False, | |
"wsgi.multiprocess": False, | |
"wsgi.run_once": False, | |
"wsgi.file_wrapper": None, # TODO: use FileSender | |
"wsgi.url_scheme": req.scheme, | |
"SERVER_NAME": server_host_port[0], | |
"SERVER_PORT": int(server_host_port[1]), | |
"SERVER_SOFTWARE": aiohttp.HttpMessage.SERVER_SOFTWARE, | |
"REQUEST_METHOD": req.method, | |
"SERVER_PROTOCOL": "HTTP/%s.%s" % req.version, | |
"SCRIPT_NAME": "", # TODO: use "mount-point" (stable path prefix) of self | |
"PATH_INFO": req.path, | |
"QUERY_STRING": unquote_plus(req.query_string), | |
} | |
if (req.has_body): | |
environ["CONTENT_TYPE"] = req.content_type | |
try: | |
environ["CONTENT_LENGTH"] = headers[hdrs.CONTENT_LENGTH] | |
except KeyError: | |
pass | |
# CGI-compatible (Q: is correct?) | |
#for optional_hdr in ("REMOTE_USER", "REMOTE_HOST", "REMOTE_ADDR", "REMOTE_PORT",): | |
# try: | |
# environ[optional_hdr] = headers[optional_hdr] | |
# except KeyError: | |
# pass | |
# HTTP_* (+ HTTP_HOST) | |
for other_hdr, v in headers.items(): | |
other_hdr = other_hdr.replace("-", "_") | |
if (other_hdr in environ): | |
continue | |
other_hdr = "HTTP_" + other_hdr | |
prev_v = environ.get(other_hdr, None) | |
if (not prev_v is None): | |
v = prev_v+","+v | |
environ[other_hdr] = v | |
#if (self.is_async): # TODO: | |
# environ['async.reader'] = ... | |
# environ['async.writer'] = ... | |
assert (not environ['SCRIPT_NAME']) | |
return environ | |
def start_wsgi_response(self, status_reason, headers, exc_info = None, *, _respreq_ctx :dict): | |
""" | |
start_response() callable as in PEP 3333 | |
(see also aiohttp.wsgi.WsgiResponse) | |
""" | |
if exc_info: | |
if _respreq_ctx.get("status", None): | |
raise exc_info[1] | |
status_reason = status_reason.split(' ', 1) | |
status = status_reason[0] | |
try: | |
reason = status_reason[1] | |
except IndexError: | |
reason = None | |
_respreq_ctx["status"] = int(status) | |
_respreq_ctx["reason"] = reason | |
_respreq_ctx["headers"] = { hdr : v for hdr, v in headers if (not hdr in self.IGNORED_HEADERS)} | |
def _wsgi_write_data(data): | |
try: | |
return _respreq_ctx["_out"].write(data) | |
except KeyError: | |
_respreq_ctx["_out"] = buf = BytesIO() | |
self._log.debug("wsgi_write(...): output buffer created") | |
return buf.write(data) | |
return _wsgi_write_data | |
async def handle_request(self, req :web.Request): | |
# -- | |
if (not self.is_async): | |
content = SyncStreamReader(req.content, loop = self._loop) | |
else: | |
content = req.content | |
# -- | |
ctx = {} | |
resp = None | |
def create_response__drain_wsgi_result(wsgi_result): | |
nonlocal resp | |
# | |
next_async = to_async_corofunc(next, _loop = self._loop) | |
_first_results = [] | |
# | |
async def _prepare_resp(): | |
while (not "status" in ctx): | |
r = await next_async(wsgi_result) | |
if (not r is None): | |
_first_results.append(r) | |
# | |
nonlocal resp | |
resp = web.StreamResponse( | |
status = ctx["status"], reason = ctx["reason"], headers = ctx["headers"] | |
) | |
await resp.prepare(req) | |
yield _prepare_resp() | |
# | |
try: | |
yield ctx["_out"].read() | |
except KeyError: | |
pass | |
finally: | |
ctx.clear() | |
# | |
yield from _first_results | |
# | |
while wsgi_result: # (or just True) | |
yield next_async(wsgi_result) | |
async with AsyncContextManagerGuard( | |
to_async_corofunc(self.wsgi_app, _loop = self._loop)( | |
self.create_wsgi_environ(req, content), | |
lambda *a: self.start_wsgi_response(*a, _respreq_ctx = ctx) | |
) | |
) as wsgi_result: | |
async for datum in AwaitTransparent_Reducing_AsyncGenerator( | |
create_response__drain_wsgi_result(wsgi_result) | |
): | |
resp.write(datum) | |
await resp.drain() | |
# -- | |
await resp.write_eof() # Q: shall we close response stream explicitly?! | |
return resp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class with_EventLoop: | |
def __init__(self, *, loop = None): | |
self._loop = loop or asyncio.get_event_loop() | |
@property | |
def aio_loop(self): | |
return self._loop | |
class AsyncContextManagerGuard: | |
def __init__(self, enter_corofunc, exit_corofunc = None, *, guarded = None): | |
if (not inspect.iscoroutine(enter_corofunc)): | |
enter_corofunc = asyncio.coroutine(enter_corofunc) | |
self._enter = enter_corofunc | |
self._guarded = guarded | |
# | |
self._exit = None | |
if (not exit_corofunc is None): | |
self._exit_func = asyncio.coroutine(exit_corofunc) | |
async def __aenter__(self): | |
assert (not self._enter is None) | |
if (inspect.iscoroutine(self._enter)): | |
ret = await self._enter | |
self._enter = None | |
else: | |
ret = await self._enter() | |
return self._guarded or ret | |
async def __aexit__(self, *exc_info): | |
if (not self._exit is None): | |
try: | |
await self._exit(exc_info) | |
except TypeError: | |
await self._exit() | |
class SyncStreamReader(with_EventLoop): | |
def __init__(self, async_stream, **kwargs): | |
super().__init__(**kwargs) | |
self.async_stream = async_stream | |
# TODO: use separate executor for stream reader | |
# OR (better) create intelligent ThreadPoolExecutor | |
self._read = to_sync_func(async_stream.read, _loop = self._loop) | |
def read(self, *args, **kwargs): | |
return self._read(*args, **kwargs) | |
class AwaitTransparent_Reducing_AsyncGenerator: | |
def __init__(self, gen): | |
""" | |
:param gen: synchronous generator | |
TODO: 1) support for iterable: list, ...; | |
2) support for asynchronous generator,genfunc,iterable?? | |
""" | |
self._gen_func = None | |
self._gen = None | |
if (inspect.isfunction(gen)): | |
self._gen_func = gen | |
if (inspect.isgenerator(gen)): | |
self._gen = gen | |
# TODO: <3.5.2 compatibility, | |
# see: https://docs.python.org/3/reference/datamodel.html#asynchronous-iterators | |
def __aiter__(self): | |
if (self._gen_func): | |
self._gen = self._gen_func() or self._gen | |
return self | |
async def __anext__(self): | |
while (inspect.isawaitable(self._gen)): # <=> "AwaitTransparent"/sequence-container | |
self._gen = await self._gen | |
# | |
n = None | |
while (n is None): # <=> "Reducing" | |
try: | |
n = next(self._gen) | |
except StopIteration as exc: | |
raise StopAsyncIteration() from exc | |
# | |
while (inspect.isawaitable(n)): # <=> "AwaitTransparent"/elements | |
n = await n | |
assert (not n is None) | |
return n |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment