-
-
Save JasonLG1979/a2bc6df6048e141353ebb0679a77abb2 to your computer and use it in GitHub Desktop.
decorator take 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from functools import partial, wraps, update_wrapper | |
from typing import Awaitable, Optional, Any, Callable | |
def queue_asyncio_task(func) -> Awaitable[asyncio.Task]: | |
""" | |
A decorator that allows for the queuing of coroutine functions \ | |
by automating :meth:`asyncio.ensure_future` and :meth:`asyncio.Task.add_done_callback`. \ | |
If the return value is of no concern the coroutine function may be called \ | |
normally as if it were a synchronous function. Otherwise the \ | |
optional keyword arguments ``callback`` and ``user_data`` may be used. \ | |
All other positonal and keyword arguments, if any, \ | |
will be passed directly to the decorated coroutine function. | |
:param callback: A :meth:`queue_asyncio_task_callback` type callback. | |
:param user_data: User data to be passed to the callback unchanged. | |
:type callback: :class:`Optional` [:class:`Callable`] | |
:type user_data: :class:`Optional` [:class:`Any`] | |
:returns: A :class:`asyncio.Task` that can be used, for example, to cancel the task if desired.` | |
""" | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
callback = kwargs.pop('callback', None) | |
user_data = kwargs.pop('user_data', None) | |
task = asyncio.ensure_future(func(*args, **kwargs)) | |
if callback: | |
task.add_done_callback(partial(callback, user_data=user_data)) | |
return task | |
return update_wrapper(wrapper, func) | |
def queue_asyncio_task_callback(task: asyncio.Task, | |
user_data: Optional[Any] = None) -> None: | |
""" | |
Type definition for a callback for a :meth:`queue_asyncio_task` decorated coroutine function. \ | |
Called when the task has been canceled, or had it's exception or result set. | |
:param task: A task to get the result or exception from. | |
:param user_data: User data passed to the callback | |
""" | |
# Don't actually use this for anything. For documentation purposes only!!! | |
try: | |
result = task.result() | |
except asyncio.CancelledError: | |
print('Cancelled task: {}'.format(task)) | |
except Exception as e: | |
print('Exception in task: {}: {}'.format(e.__class__.__name__, e)) | |
else: | |
print('Task result: {}'.format(result)) | |
if user_data is not None: | |
print('callback user_data: {}'.format(user_data)) | |
if __name__ == '__main__': | |
from random import choice, randint | |
def on_sleep_done(task, user_data=None): | |
try: | |
time = task.result() | |
except asyncio.CancelledError: | |
print('Cancelled task: {}'.format(task)) | |
except Exception as e: | |
print('Exception in task: {}: {}'.format(e.__class__.__name__, e)) | |
else: | |
print('Task completed after {} seconds'.format(time)) | |
if user_data: | |
cancel_all_and_stop_loop() | |
def cancel_all_and_stop_loop(): | |
for task in asyncio.Task.all_tasks(): | |
if not task.done(): | |
task.cancel() | |
loop.stop() | |
@queue_asyncio_task | |
async def sleep(time): | |
await asyncio.sleep(time) | |
return time | |
def test(): | |
total_tasks = 0 | |
exception_tasks = 0 | |
cancelled_tasks = 0 | |
successful_tasks = 0 | |
longest_task = 0 | |
for _ in range(20): | |
choices = [i for i in range(randint(1, 20))] + ['a'] | |
_choice = choice(choices) | |
cancel = choice([True, False]) | |
task = sleep(_choice, callback=on_sleep_done, user_data=False) | |
if cancel: | |
task.cancel() | |
cancelled_tasks += 1 | |
elif _choice == 'a': | |
exception_tasks += 1 | |
else: | |
successful_tasks += 1 | |
if longest_task < _choice: | |
longest_task = _choice | |
total_tasks += 1 | |
longest_task = longest_task + 1 | |
total_tasks += 1 | |
successful_tasks += 1 | |
sleep(longest_task, callback=on_sleep_done, user_data=True) | |
print('Total tasks: {}'.format(total_tasks)) | |
print('Cancelled task: {}'.format(cancelled_tasks)) | |
print('Tasks that will trigger an Exception: {}'.format(exception_tasks)) | |
print('Tasks that will succeed: {}'.format(successful_tasks)) | |
print('Longest task: {} seconds'.format(longest_task)) | |
test() | |
loop = asyncio.get_event_loop() | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment