Skip to content

Instantly share code, notes, and snippets.

@mprymek
Created August 21, 2019 15:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mprymek/133244919964aa18f57fbda3b55df738 to your computer and use it in GitHub Desktop.
Save mprymek/133244919964aa18f57fbda3b55df738 to your computer and use it in GitHub Desktop.
import asyncio
import asyncio.futures
import logging
import aiojobs
logging.basicConfig(format='[%(levelname)-7s] %(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
class Nursery:
'''
Implements aionursery-like API over aiojobs' Scheduler.
There's one big and extremely stupid catch: the `exception_handler` is
*sync* function therefore we cannot call async `self._scheduler.close()`
from it directly. Hence the `_exception_event`.
'''
def __init__(self, name='nursery'):
self._name = name
self._exception_event = asyncio.Event()
self._scheduler = None
self._killer_task = None
def _exception_handler(self, scheduler, context):
logging.debug(f'{self._name}: exc handler fired')
self._exception_event.set()
# call default exception handler to logging.info out exception info
logging.error(80*'-')
scheduler._loop.call_exception_handler(context)
logging.error(80*'-')
async def __aenter__(self):
logging.debug(f'{self._name}: aenter')
self._scheduler = await aiojobs.create_scheduler(limit=None, exception_handler=self._exception_handler)
async def killer(scheduler, close_event):
logging.debug(f'{self._name}: killer started')
await close_event.wait()
logging.info(f'{self._name}: killing all jobs')
await self._scheduler.close()
self._killer_task = asyncio.create_task(killer(self._scheduler, self._exception_event))
return self
async def __aexit__(self, exc_type, exc, tb):
logging.debug(f'{self._name}: aexit')
await self._scheduler.close()
if self._killer_task is not None:
self._killer_task.cancel()
async def wait(self):
# When we use the following code, `_exception_handler` is not called.
# -God- Guido knows why...
#wait_coros = [job.wait() for job in self._scheduler]
#await asyncio.gather(*wait_coros, return_exceptions=True)
# ...therefore I'm using this worse "semi-busy waiting" code.
while len(self._scheduler):
await asyncio.sleep(0.1)
def __repr__(self):
return f'<Nursery {self._name} jobs={len(self._scheduler)}>'
async def start_soon(self, func, *args, **kwargs):
await self._scheduler.spawn(func(*args, **kwargs))
async def task1(name, delay=0.5, count_to=4, raise_at=None):
try:
logging.info(f'{name}: start')
for i in range(count_to):
logging.info(f'{name}: {i}')
if i == raise_at:
raise RuntimeError(f'error in task {name}')
await asyncio.sleep(delay)
logging.info(f'{name}: end')
except asyncio.CancelledError:
logging.info(f'{name}: cancelled')
async def check_tasks():
#logging.info('\n** waiting for non-pending tasks cleanup...')
await asyncio.sleep(0.1)
pending = [task for task in asyncio.Task.all_tasks() if task._state == asyncio.futures._PENDING]
if len(pending) != 1:
logging.info('************ ERROR! Pending tasks!')
for task in pending:
logging.info(f'\t{task}')
async def test1():
logging.info('*********** classic aiojobs scheduler')
scheduler1 = await aiojobs.create_scheduler()
for i in range(2):
await scheduler1.spawn(task1(f's1t{i}'))
logging.info('scheduler1: tasks spawned')
logging.info(scheduler1)
logging.info('waiting for jobs in scheduler1 to end')
wait_coros = [job.wait() for job in scheduler1]
await asyncio.gather(*wait_coros, return_exceptions=True)
await scheduler1.close()
logging.info('scheduler1: closed')
logging.info(scheduler1)
await check_tasks()
async def test2():
logging.info('*********** nursery without waiting - tasks 0 and 2 should be killed')
async with Nursery('n1') as nursery:
for i in range(2):
await nursery.start_soon(task1, f'n1t{i}')
logging.info('nursery n1: tasks started')
logging.info(nursery)
logging.info('nursery n1 closed')
await check_tasks()
async def test3():
logging.info('*********** nursery with waiting - tasks should end cleanly')
async with Nursery('n2') as nursery:
for i in range(2):
await nursery.start_soon(task1, f'n2t{i}')
logging.info('nursery n2: tasks started')
logging.info(nursery)
logging.info('waiting for jobs in nursery n2')
await nursery.wait()
logging.info('nursery n2 closed')
await check_tasks()
async def test4():
logging.info('*********** nursery with exception raising - tasks 0 and 2 should be killed')
async with Nursery('n3') as nursery:
for i in range(3):
if i == 1:
await nursery.start_soon(task1, f'n3t{i}', raise_at=1)
else:
await nursery.start_soon(task1, f'n3t{i}')
logging.info('nursery n3: tasks started')
logging.info(nursery)
logging.info('waiting for jobs in nursery n3')
await nursery.wait()
logging.info('nursery n3 closed')
await check_tasks()
async def main():
await test1()
await test2()
await test3()
await test4()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment