Skip to content

Instantly share code, notes, and snippets.

@johnliu55tw
Created February 22, 2018 15:21
Show Gist options
  • Save johnliu55tw/c46754ff9f8a6ea8e863e788d92f678a to your computer and use it in GitHub Desktop.
Save johnliu55tw/c46754ff9f8a6ea8e863e788d92f678a to your computer and use it in GitHub Desktop.
Running repetitive coroutines (Providing a list of arguments for the coroutine) in limited number of coroutines running at a time.
import asyncio
import random
from uuid import uuid4
MAX_WORKERS = 20
# The semaphore for synchronization
sema = asyncio.Semaphore(MAX_WORKERS)
# Storing currently running tasks
tasks = set()
# Gathering results
results = list()
async def gather(id, coro, args):
"""Await the coroutine and gather the result."""
result = await coro(args)
results.append(result)
tasks.remove(id)
sema.release()
async def dispatching(coro, args_list):
"""Dispatching list of arguments to a coroutine."""
for args in args_list:
print('Wait for semaphore...')
await sema.acquire()
id = uuid4()
print('Got it! Dispatching job. ID: {}'.format(id))
tasks.add(id)
asyncio.ensure_future(gather(id, coro, args))
while tasks:
await asyncio.sleep(1)
async def work(args):
"""A fake working function mimicking a function that requires some time to run."""
# Randomly sleep between 1 to 3 seconds
await asyncio.sleep(1 + (random.random() * 2))
return args
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(dispatching(work, range(100)))
print('All tasks done.')
results.sort()
print(results)
assert results == list(range(100))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment