Skip to content

Instantly share code, notes, and snippets.

@ldotlopez
Created January 19, 2016 14:52
Show Gist options
  • Save ldotlopez/b965d4887da6061c3a53 to your computer and use it in GitHub Desktop.
Save ldotlopez/b965d4887da6061c3a53 to your computer and use it in GitHub Desktop.
import asyncio
import time
from concurrent import futures
loop = asyncio.get_event_loop()
loop.set_debug(True)
def print_future_state(fut):
print('future state done:{} cancelled:{}'.format(
fut.done(), fut.cancelled()
))
@asyncio.coroutine
def sleep(secs=0.1):
print('sleep')
yield from asyncio.sleep(secs)
print('wake')
def blocking_sleep(secs=0.1):
print('sleep')
time.sleep(secs)
print('wake')
@asyncio.coroutine
def wrap_sleep_with_future(coro, fut):
res = yield from coro
fut.set_result(res)
def on_timeout(fut):
if not fut.done():
fut.cancel()
print_future_state(fut)
loop.stop()
def on_timeout_with_reenter(fut):
def _timeout_reenter_loop():
print_future_state(fut)
loop.stop()
if not fut.done():
fut.cancel()
print_future_state(fut)
loop.call_soon(_timeout_reenter_loop)
def on_future_done(fut):
print('done')
print('result:', fut.result())
loop.stop()
def test_future_with_done_callback():
fut = asyncio.Future()
fut.add_done_callback(on_future_done)
coro = sleep()
loop.create_task(wrap_sleep_with_future(coro, fut))
def test_create_task_with_done_callback():
coro = sleep()
task = loop.create_task(coro)
task.add_done_callback(on_future_done)
def test_create_task_with_done_and_timeout():
coro = sleep(0.5)
task = loop.create_task(coro)
task.add_done_callback(on_future_done)
loop.call_later(0.3, on_timeout, task)
def test_create_task_with_done_and_timeout_with_reenter():
coro = sleep(0.5)
task = loop.create_task(coro)
task.add_done_callback(on_future_done)
loop.call_later(0.3, on_timeout_with_reenter, task)
def test_exception_handler():
def _hander(loop, ctx):
print("exception:", type(ctx['exception']))
print_future_state(ctx['handle'])
loop.set_exception_handler(_hander)
coro = sleep(0.5)
task = loop.create_task(coro)
task.add_done_callback(on_future_done)
def test_executor():
executor = futures.ThreadPoolExecutor(max_workers=5)
loop.set_default_executor(executor)
for s in range(1, 11):
loop.run_in_executor(None, blocking_sleep, s/10)
# Use call_soon method to run tests
loop.call_soon(test_executor)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment