Skip to content

Instantly share code, notes, and snippets.

@njam njam/asyncio_pool.py

Created Oct 13, 2017
Embed
What would you like to do?
Limit number of concurrently running asyncio tasks
import asyncio
from collections import deque
class AsyncioPool:
def __init__(self, concurrency, loop=None):
"""
@param loop: asyncio loop
@param concurrency: Maximum number of concurrently running tasks
"""
self._loop = loop or asyncio.get_event_loop()
self._concurrency = concurrency
self._coros = deque([]) # All coroutines queued for execution
self._futures = [] # All currently running coroutines
def add_coro(self, coro):
"""
@param coro: coroutine to add
"""
self._coros.append(coro)
self.print_status()
def run_until_complete(self):
self._loop.run_until_complete(self._wait_for_futures())
def print_status(self):
print(' Status: coros:%s - futures:%s' % (len(self._coros), len(self._futures)))
def _start_futures(self):
num_to_start = self._concurrency - len(self._futures)
num_to_start = min(num_to_start, len(self._coros))
for _ in range(num_to_start):
coro = self._coros.popleft()
future = asyncio.ensure_future(coro, loop=self._loop)
self._futures.append(future)
self.print_status()
async def _wait_for_futures(self):
while len(self._coros) > 0 or len(self._futures) > 0:
self._start_futures()
futures_completed, futures_pending = \
await asyncio.wait(self._futures, loop=self._loop, return_when=asyncio.FIRST_COMPLETED)
for future in futures_completed:
self._futures.remove(future)
self._start_futures()
pool = AsyncioPool(3)
async def work(i):
i += 1
if i < 5:
print('Work, adding more task %s' % i)
pool.add_coro(work(i))
pool.add_coro(work(i))
await asyncio.sleep(0.1)
pool.add_coro(work(0))
pool.run_until_complete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.