Skip to content

Instantly share code, notes, and snippets.

@xpinguin
Created November 17, 2016 21:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xpinguin/e77a4a817aa1b1590de1743fa41f6671 to your computer and use it in GitHub Desktop.
Save xpinguin/e77a4a817aa1b1590de1743fa41f6671 to your computer and use it in GitHub Desktop.
def to_async_corofunc(sync_func, *, _loop :asyncio.AbstractEventLoop = None, _timeout = None, _iter_protect = True):
"""
F -> await <loop>.run_in_executor(F)
"""
if inspect.iscoroutinefunction(sync_func):
return sync_func
##
if (inspect.ismethoddescriptor(sync_func) or inspect.ismethod(sync_func)):
_partial = partialmethod
elif (callable(sync_func)):
_partial = partial
else:
raise TypeError("sync_func :%s ='%s': is not callable" % (type(sync_func).__name__, sync_func), sync_func)
if (_iter_protect):
unprotected_sync_func = sync_func
@wraps(unprotected_sync_func)
def sync_func(*args, **kwargs):
try:
return unprotected_sync_func(*args, **kwargs)
except StopIteration as exc:
raise StopAsyncIteration() from exc
@wraps(sync_func)
async def async_func_wrap(*args, **kwargs):
loop = _loop or asyncio.get_event_loop()
return await asyncio.wait_for(loop.run_in_executor(None,
((sync_func)
if (not kwargs) else
_partial(sync_func, **kwargs)),
*args
), _timeout, loop = loop)
return async_func_wrap
def to_sync_func(async_corofunc, *, _loop = None, _timeout = None):
"""
coroF -> asyncio.run_coroutine_threadsafe(coroF).<wait>(...)
"""
@wraps(async_corofunc)
def sync_corofunc_wrap(*args, **kwargs):
loop = _loop or asyncio.get_event_loop()
# TODO/Q: shall we cancel on timeout? (ie. catch asyncio.TimeoutError)
return asyncio.run_coroutine_threadsafe(
async_corofunc(*args, **kwargs),
loop = loop
).result(_timeout)
return sync_corofunc_wrap
class WSGIApp(with_EventLoop):
# -----
DEFAULT_SERVER_PORT = 80
IGNORED_HEADERS = { # Q: or just rely on .HOP_HEADERS somewhere inside aiohttp?
hdrs.CONNECTION,
hdrs.KEEP_ALIVE,
hdrs.PROXY_AUTHENTICATE,
hdrs.PROXY_AUTHORIZATION,
hdrs.TE,
hdrs.TRAILER,
hdrs.TRANSFER_ENCODING,
hdrs.UPGRADE,
}
# -----
def __init__(self, app, *, is_async = False, **kwargs):
super().__init__(**kwargs)
self.wsgi_app = app
self.is_async = is_async
self._log = logging.getLogger(str(self))
def __str__(self):
return "%s(%s)" % (self.__class__.__name__, str(self.wsgi_app))
def create_wsgi_environ(self, req :web.Request, content):
"""(adapted from: aiohttp.wsgi.WSGIServerHttpProtocol)"""
headers = req.headers
server_host_port = headers.get(hdrs.HOST, "<unknown-host>").split(":")
if (len(server_host_port) == 1):
server_host_port.append(self.DEFAULT_SERVER_PORT)
environ = { # TODO: lazy dict
"wsgi.input": content,
"wsgi.errors": sys.stderr,
"wsgi.version": (1, 0),
"wsgi.async": self.is_async,
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"wsgi.file_wrapper": None, # TODO: use FileSender
"wsgi.url_scheme": req.scheme,
"SERVER_NAME": server_host_port[0],
"SERVER_PORT": int(server_host_port[1]),
"SERVER_SOFTWARE": aiohttp.HttpMessage.SERVER_SOFTWARE,
"REQUEST_METHOD": req.method,
"SERVER_PROTOCOL": "HTTP/%s.%s" % req.version,
"SCRIPT_NAME": "", # TODO: use "mount-point" (stable path prefix) of self
"PATH_INFO": req.path,
"QUERY_STRING": unquote_plus(req.query_string),
}
if (req.has_body):
environ["CONTENT_TYPE"] = req.content_type
try:
environ["CONTENT_LENGTH"] = headers[hdrs.CONTENT_LENGTH]
except KeyError:
pass
# CGI-compatible (Q: is correct?)
#for optional_hdr in ("REMOTE_USER", "REMOTE_HOST", "REMOTE_ADDR", "REMOTE_PORT",):
# try:
# environ[optional_hdr] = headers[optional_hdr]
# except KeyError:
# pass
# HTTP_* (+ HTTP_HOST)
for other_hdr, v in headers.items():
other_hdr = other_hdr.replace("-", "_")
if (other_hdr in environ):
continue
other_hdr = "HTTP_" + other_hdr
prev_v = environ.get(other_hdr, None)
if (not prev_v is None):
v = prev_v+","+v
environ[other_hdr] = v
#if (self.is_async): # TODO:
# environ['async.reader'] = ...
# environ['async.writer'] = ...
assert (not environ['SCRIPT_NAME'])
return environ
def start_wsgi_response(self, status_reason, headers, exc_info = None, *, _respreq_ctx :dict):
"""
start_response() callable as in PEP 3333
(see also aiohttp.wsgi.WsgiResponse)
"""
if exc_info:
if _respreq_ctx.get("status", None):
raise exc_info[1]
status_reason = status_reason.split(' ', 1)
status = status_reason[0]
try:
reason = status_reason[1]
except IndexError:
reason = None
_respreq_ctx["status"] = int(status)
_respreq_ctx["reason"] = reason
_respreq_ctx["headers"] = { hdr : v for hdr, v in headers if (not hdr in self.IGNORED_HEADERS)}
def _wsgi_write_data(data):
try:
return _respreq_ctx["_out"].write(data)
except KeyError:
_respreq_ctx["_out"] = buf = BytesIO()
self._log.debug("wsgi_write(...): output buffer created")
return buf.write(data)
return _wsgi_write_data
async def handle_request(self, req :web.Request):
# --
if (not self.is_async):
content = SyncStreamReader(req.content, loop = self._loop)
else:
content = req.content
# --
ctx = {}
resp = None
def create_response__drain_wsgi_result(wsgi_result):
nonlocal resp
#
next_async = to_async_corofunc(next, _loop = self._loop)
_first_results = []
#
async def _prepare_resp():
while (not "status" in ctx):
r = await next_async(wsgi_result)
if (not r is None):
_first_results.append(r)
#
nonlocal resp
resp = web.StreamResponse(
status = ctx["status"], reason = ctx["reason"], headers = ctx["headers"]
)
await resp.prepare(req)
yield _prepare_resp()
#
try:
yield ctx["_out"].read()
except KeyError:
pass
finally:
ctx.clear()
#
yield from _first_results
#
while wsgi_result: # (or just True)
yield next_async(wsgi_result)
async with AsyncContextManagerGuard(
to_async_corofunc(self.wsgi_app, _loop = self._loop)(
self.create_wsgi_environ(req, content),
lambda *a: self.start_wsgi_response(*a, _respreq_ctx = ctx)
)
) as wsgi_result:
async for datum in AwaitTransparent_Reducing_AsyncGenerator(
create_response__drain_wsgi_result(wsgi_result)
):
resp.write(datum)
await resp.drain()
# --
await resp.write_eof() # Q: shall we close response stream explicitly?!
return resp
class with_EventLoop:
def __init__(self, *, loop = None):
self._loop = loop or asyncio.get_event_loop()
@property
def aio_loop(self):
return self._loop
class AsyncContextManagerGuard:
def __init__(self, enter_corofunc, exit_corofunc = None, *, guarded = None):
if (not inspect.iscoroutine(enter_corofunc)):
enter_corofunc = asyncio.coroutine(enter_corofunc)
self._enter = enter_corofunc
self._guarded = guarded
#
self._exit = None
if (not exit_corofunc is None):
self._exit_func = asyncio.coroutine(exit_corofunc)
async def __aenter__(self):
assert (not self._enter is None)
if (inspect.iscoroutine(self._enter)):
ret = await self._enter
self._enter = None
else:
ret = await self._enter()
return self._guarded or ret
async def __aexit__(self, *exc_info):
if (not self._exit is None):
try:
await self._exit(exc_info)
except TypeError:
await self._exit()
class SyncStreamReader(with_EventLoop):
def __init__(self, async_stream, **kwargs):
super().__init__(**kwargs)
self.async_stream = async_stream
# TODO: use separate executor for stream reader
# OR (better) create intelligent ThreadPoolExecutor
self._read = to_sync_func(async_stream.read, _loop = self._loop)
def read(self, *args, **kwargs):
return self._read(*args, **kwargs)
class AwaitTransparent_Reducing_AsyncGenerator:
def __init__(self, gen):
"""
:param gen: synchronous generator
TODO: 1) support for iterable: list, ...;
2) support for asynchronous generator,genfunc,iterable??
"""
self._gen_func = None
self._gen = None
if (inspect.isfunction(gen)):
self._gen_func = gen
if (inspect.isgenerator(gen)):
self._gen = gen
# TODO: <3.5.2 compatibility,
# see: https://docs.python.org/3/reference/datamodel.html#asynchronous-iterators
def __aiter__(self):
if (self._gen_func):
self._gen = self._gen_func() or self._gen
return self
async def __anext__(self):
while (inspect.isawaitable(self._gen)): # <=> "AwaitTransparent"/sequence-container
self._gen = await self._gen
#
n = None
while (n is None): # <=> "Reducing"
try:
n = next(self._gen)
except StopIteration as exc:
raise StopAsyncIteration() from exc
#
while (inspect.isawaitable(n)): # <=> "AwaitTransparent"/elements
n = await n
assert (not n is None)
return n
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment