Created
January 19, 2016 14:52
-
-
Save ldotlopez/b965d4887da6061c3a53 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
from concurrent import futures | |
loop = asyncio.get_event_loop() | |
loop.set_debug(True) | |
def print_future_state(fut): | |
print('future state done:{} cancelled:{}'.format( | |
fut.done(), fut.cancelled() | |
)) | |
@asyncio.coroutine | |
def sleep(secs=0.1): | |
print('sleep') | |
yield from asyncio.sleep(secs) | |
print('wake') | |
def blocking_sleep(secs=0.1): | |
print('sleep') | |
time.sleep(secs) | |
print('wake') | |
@asyncio.coroutine | |
def wrap_sleep_with_future(coro, fut): | |
res = yield from coro | |
fut.set_result(res) | |
def on_timeout(fut): | |
if not fut.done(): | |
fut.cancel() | |
print_future_state(fut) | |
loop.stop() | |
def on_timeout_with_reenter(fut): | |
def _timeout_reenter_loop(): | |
print_future_state(fut) | |
loop.stop() | |
if not fut.done(): | |
fut.cancel() | |
print_future_state(fut) | |
loop.call_soon(_timeout_reenter_loop) | |
def on_future_done(fut): | |
print('done') | |
print('result:', fut.result()) | |
loop.stop() | |
def test_future_with_done_callback(): | |
fut = asyncio.Future() | |
fut.add_done_callback(on_future_done) | |
coro = sleep() | |
loop.create_task(wrap_sleep_with_future(coro, fut)) | |
def test_create_task_with_done_callback(): | |
coro = sleep() | |
task = loop.create_task(coro) | |
task.add_done_callback(on_future_done) | |
def test_create_task_with_done_and_timeout(): | |
coro = sleep(0.5) | |
task = loop.create_task(coro) | |
task.add_done_callback(on_future_done) | |
loop.call_later(0.3, on_timeout, task) | |
def test_create_task_with_done_and_timeout_with_reenter(): | |
coro = sleep(0.5) | |
task = loop.create_task(coro) | |
task.add_done_callback(on_future_done) | |
loop.call_later(0.3, on_timeout_with_reenter, task) | |
def test_exception_handler(): | |
def _hander(loop, ctx): | |
print("exception:", type(ctx['exception'])) | |
print_future_state(ctx['handle']) | |
loop.set_exception_handler(_hander) | |
coro = sleep(0.5) | |
task = loop.create_task(coro) | |
task.add_done_callback(on_future_done) | |
def test_executor(): | |
executor = futures.ThreadPoolExecutor(max_workers=5) | |
loop.set_default_executor(executor) | |
for s in range(1, 11): | |
loop.run_in_executor(None, blocking_sleep, s/10) | |
# Use call_soon method to run tests | |
loop.call_soon(test_executor) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment