Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Limit number of concurrently running asyncio tasks
import asyncio
from collections import deque
class AsyncioPool:
def __init__(self, concurrency, loop=None):
"""
@param loop: asyncio loop
@param concurrency: Maximum number of concurrently running tasks
"""
self._loop = loop or asyncio.get_event_loop()
self._concurrency = concurrency
self._coros = deque([]) # All coroutines queued for execution
self._futures = [] # All currently running coroutines
def add_coro(self, coro):
"""
@param coro: coroutine to add
"""
self._coros.append(coro)
self.print_status()
def run_until_complete(self):
self._loop.run_until_complete(self._wait_for_futures())
def print_status(self):
print(' Status: coros:%s - futures:%s' % (len(self._coros), len(self._futures)))
def _start_futures(self):
num_to_start = self._concurrency - len(self._futures)
num_to_start = min(num_to_start, len(self._coros))
for _ in range(num_to_start):
coro = self._coros.popleft()
future = asyncio.ensure_future(coro, loop=self._loop)
self._futures.append(future)
self.print_status()
async def _wait_for_futures(self):
while len(self._coros) > 0 or len(self._futures) > 0:
self._start_futures()
futures_completed, futures_pending = \
await asyncio.wait(self._futures, loop=self._loop, return_when=asyncio.FIRST_COMPLETED)
for future in futures_completed:
self._futures.remove(future)
self._start_futures()
pool = AsyncioPool(3)
async def work(i):
i += 1
if i < 5:
print('Work, adding more task %s' % i)
pool.add_coro(work(i))
pool.add_coro(work(i))
await asyncio.sleep(0.1)
pool.add_coro(work(0))
pool.run_until_complete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment