Skip to content

Instantly share code, notes, and snippets.

@JasonLG1979
Last active January 5, 2018 22:09
Show Gist options
  • Save JasonLG1979/a2bc6df6048e141353ebb0679a77abb2 to your computer and use it in GitHub Desktop.
Save JasonLG1979/a2bc6df6048e141353ebb0679a77abb2 to your computer and use it in GitHub Desktop.
decorator take 2
import asyncio
from functools import partial, wraps, update_wrapper
from typing import Awaitable, Optional, Any, Callable
def queue_asyncio_task(func) -> Awaitable[asyncio.Task]:
"""
A decorator that allows for the queuing of coroutine functions \
by automating :meth:`asyncio.ensure_future` and :meth:`asyncio.Task.add_done_callback`. \
If the return value is of no concern the coroutine function may be called \
normally as if it were a synchronous function. Otherwise the \
optional keyword arguments ``callback`` and ``user_data`` may be used. \
All other positonal and keyword arguments, if any, \
will be passed directly to the decorated coroutine function.
:param callback: A :meth:`queue_asyncio_task_callback` type callback.
:param user_data: User data to be passed to the callback unchanged.
:type callback: :class:`Optional` [:class:`Callable`]
:type user_data: :class:`Optional` [:class:`Any`]
:returns: A :class:`asyncio.Task` that can be used, for example, to cancel the task if desired.`
"""
@wraps(func)
def wrapper(*args, **kwargs):
callback = kwargs.pop('callback', None)
user_data = kwargs.pop('user_data', None)
task = asyncio.ensure_future(func(*args, **kwargs))
if callback:
task.add_done_callback(partial(callback, user_data=user_data))
return task
return update_wrapper(wrapper, func)
def queue_asyncio_task_callback(task: asyncio.Task,
user_data: Optional[Any] = None) -> None:
"""
Type definition for a callback for a :meth:`queue_asyncio_task` decorated coroutine function. \
Called when the task has been canceled, or had it's exception or result set.
:param task: A task to get the result or exception from.
:param user_data: User data passed to the callback
"""
# Don't actually use this for anything. For documentation purposes only!!!
try:
result = task.result()
except asyncio.CancelledError:
print('Cancelled task: {}'.format(task))
except Exception as e:
print('Exception in task: {}: {}'.format(e.__class__.__name__, e))
else:
print('Task result: {}'.format(result))
if user_data is not None:
print('callback user_data: {}'.format(user_data))
if __name__ == '__main__':
from random import choice, randint
def on_sleep_done(task, user_data=None):
try:
time = task.result()
except asyncio.CancelledError:
print('Cancelled task: {}'.format(task))
except Exception as e:
print('Exception in task: {}: {}'.format(e.__class__.__name__, e))
else:
print('Task completed after {} seconds'.format(time))
if user_data:
cancel_all_and_stop_loop()
def cancel_all_and_stop_loop():
for task in asyncio.Task.all_tasks():
if not task.done():
task.cancel()
loop.stop()
@queue_asyncio_task
async def sleep(time):
await asyncio.sleep(time)
return time
def test():
total_tasks = 0
exception_tasks = 0
cancelled_tasks = 0
successful_tasks = 0
longest_task = 0
for _ in range(20):
choices = [i for i in range(randint(1, 20))] + ['a']
_choice = choice(choices)
cancel = choice([True, False])
task = sleep(_choice, callback=on_sleep_done, user_data=False)
if cancel:
task.cancel()
cancelled_tasks += 1
elif _choice == 'a':
exception_tasks += 1
else:
successful_tasks += 1
if longest_task < _choice:
longest_task = _choice
total_tasks += 1
longest_task = longest_task + 1
total_tasks += 1
successful_tasks += 1
sleep(longest_task, callback=on_sleep_done, user_data=True)
print('Total tasks: {}'.format(total_tasks))
print('Cancelled task: {}'.format(cancelled_tasks))
print('Tasks that will trigger an Exception: {}'.format(exception_tasks))
print('Tasks that will succeed: {}'.format(successful_tasks))
print('Longest task: {} seconds'.format(longest_task))
test()
loop = asyncio.get_event_loop()
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment