Skip to content

Instantly share code, notes, and snippets.

@minrk
Created March 1, 2018 16:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/095b8eb5e115839b12abba1d4badd92e to your computer and use it in GitHub Desktop.
Save minrk/095b8eb5e115839b12abba1d4badd92e to your computer and use it in GitHub Desktop.
"""Testing awaitable objects
Testing compatibility of various awaitable things
for a migration from tornado coroutines to async/await.
The main pitfall appears to be our use of `tornado.gen.maybe_future`
to wrap user-defined 'maybe async' APIs,
which works for everything *except* asyncio coroutines.
If we want to make sure things are accepted by gen.maybe_future,
we have to return Futures, and not even asyncio.Task Future subclasses.
"""
import asyncio
import concurrent.futures
import inspect
import tornado.ioloop
from tornado import gen
from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future
def awaitable(obj):
"""Wrap an object in something that's awaitable
For our compatibility, this must accept:
- tornado Future (works both ways)
- asyncio Future (works both ways)
- asyncio coroutine (gen.maybe_future doesn't work)
- tornado coroutine (asyncio.ensure_future doesn't work)
- scalar (asyncio.ensure_future doesn't work)
And for our double-hard compatibility,
the result shouldn't break downstream calls to 'gen.maybe_future',
which means it must turn coroutines into Futures
Uses:
- if awaitable, return asyncio Future
- otherwise, call gen.maybe_future and cast to asyncio Future
"""
# if inspect.isawaitable(obj):
# # return obj that's already awaitable
# return obj
if isinstance(obj, concurrent.futures.Future):
return asyncio.wrap_future(obj)
# else:
elif isinstance(obj, tornado.concurrent.Future):
return to_asyncio_future(obj)
elif inspect.isawaitable(obj):
f = asyncio.ensure_future(obj)
if isinstance(f, asyncio.Task):
# tasks must be wrapped in tornado Futures
# in order to be safe for downstream
# gen.maybe_future calls.
# this is inefficient as it adds overhead to the most common case
# we have to be sure that we are passing a gen.maybe_future to another API
# before we do this
# common case
# this is undesirable
new_f = tornado.concurrent.Future()
tornado.concurrent.chain_future(f, new_f)
return new_f
else:
return f
else:
return to_asyncio_future(gen.maybe_future(obj))
def scalar():
"""Function returns a scalar"""
return 5
async def asyncio_coro():
await asyncio.sleep(0.1)
return 5
@gen.coroutine
def tornado_coro():
yield gen.sleep(0.1)
return 5
concurrent_future = concurrent.futures.Future()
concurrent_future.set_result(5)
asyncio_future = asyncio.Future()
asyncio_future.set_result(5)
tornado_future = tornado.concurrent.Future()
tornado_future.set_result(5)
tests = [
# basic Future objects
"concurrent_future",
"asyncio_future",
"tornado_future",
# # basic coroutine objects
"asyncio_coro()",
"tornado_coro()",
"",
# # tornado's maybe_future wrapper
"gen.maybe_future(concurrent_future)",
"gen.maybe_future(asyncio_future)",
"gen.maybe_future(tornado_future)",
"gen.maybe_future(tornado_coro())",
"gen.maybe_future(asyncio_coro())",
"gen.maybe_future(scalar())",
"",
# # asyncio's ensure_future wrapper
"asyncio.ensure_future(concurrent_future)",
"asyncio.ensure_future(asyncio_future)",
"asyncio.ensure_future(tornado_future)",
"asyncio.ensure_future(asyncio_coro())",
"asyncio.ensure_future(tornado_coro())",
"asyncio.ensure_future(scalar())",
"",
# our awaitable wrapper
"awaitable(concurrent_future)",
"awaitable(asyncio_future)",
"awaitable(tornado_future)",
"awaitable(asyncio_coro())",
"awaitable(tornado_coro())",
"awaitable(scalar())",
"",
# multi-layer to check if awaitable breaks downstream APIs
"gen.maybe_future(awaitable(asyncio_coro()))",
"asyncio.ensure_future(awaitable(tornado_coro()))",
"",
]
@gen.coroutine
def tornado_test():
for expr in tests:
if not expr:
print()
continue
print("tornado: yield %s" % expr, end='...')
try:
r = yield eval(expr)
assert r == 5, "incorrect result: %r" % r
except Exception as e:
print("failed: %s" % e)
yield gen.sleep(.1)
else:
print('ok')
async def aio_test():
for expr in tests:
if not expr:
print()
continue
print("asyncio: await %s" % expr, end='...')
try:
r = await eval(expr)
assert r == 5, "incorrect result: %r" % r
except Exception as e:
print("failed: %s" % e)
await asyncio.sleep(.1)
else:
print('ok')
if __name__ == '__main__':
AsyncIOMainLoop().install()
loop = tornado.ioloop.IOLoop.current()
loop.run_sync(aio_test)
print()
loop.run_sync(tornado_test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment