Skip to content

Instantly share code, notes, and snippets.

@iissme
Last active October 14, 2017 17:08
Show Gist options
  • Save iissme/f2540c80d63b71a1fd41de4476058ee8 to your computer and use it in GitHub Desktop.
Save iissme/f2540c80d63b71a1fd41de4476058ee8 to your computer and use it in GitHub Desktop.
import asyncio
import weakref
import functools
class AsyncCoroQueueDispatcher:
"""
Dispatcher gets coroutine from its iternal queue,
runs is asynchronously (in background) and waits untill it's done,
then gets next one. It's meant to be embed in another class.
Dispatcher can be easily rewritten for asyncio.PriorityQueue if needed.
'Create' method should be run via asyncio.ensure_future or loop.create_task.
"""
def __init__(self, loop=None, external_cb=None):
self.loop = loop if loop else asyncio._get_running_loop()
self.tasks_queue = asyncio.Queue()
self.results_queue = asyncio.Queue(maxsize=10)
self._process_next = asyncio.Event()
# Pass cb weakref to prevent gc in some cases and let dispatcher finish all tasks.
# Cb takes 2 positional arguments: task result and task exception.
self._external_cb = external_cb if callable(external_cb) else None
async def _queue_consumer(self):
while True:
self._process_next.clear()
pending_coro = await self.tasks_queue.get()
asyncio.ensure_future(pending_coro(), loop=self.loop)
await self._process_next.wait()
self.tasks_queue.task_done()
async def create(self):
try:
self._dispatcher_task = self.loop.create_task(self._queue_consumer())
await asyncio.Future()
except asyncio.CancelledError:
# waits for remaining tasks when diispatcher's task is cancelled
await self.tasks_queue.join()
self._dispatcher_task.cancel()
def enqueue_coro(self, coro):
def future_wrapper(coro, future):
@functools.wraps(coro)
async def inner():
try:
res = await coro
except Exception as e:
future.set_exception(e)
else:
future.set_result(res)
finally:
self._process_next.set()
return inner
def task_cb(future):
if isinstance(self._external_cb, weakref.ReferenceType):
external_cb = self._external_cb()
else:
external_cb = self._external_cb
try:
res = future.result()
asyncio.ensure_future(self.results_queue.put(res), loop=self.loop)
if external_cb:
external_cb(res, None)
except Exception as e:
if external_cb:
external_cb(None, exc=e)
else:
raise e
f = asyncio.Future()
f.add_done_callback(task_cb)
self.tasks_queue.put_nowait(future_wrapper(coro, f))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment