Skip to content

Instantly share code, notes, and snippets.

@mdellavo
Created April 24, 2017 00:28
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save mdellavo/6aef86f7eba778611ce3cb9464c5fae5 to your computer and use it in GitHub Desktop.
Save mdellavo/6aef86f7eba778611ce3cb9464c5fae5 to your computer and use it in GitHub Desktop.
asyncio task pool
import asyncio
from asyncio.queues import Queue
TERMINATOR = object()
class TaskPool(object):
def __init__(self, loop, num_workers):
self.loop = loop
self.tasks = Queue(loop=self.loop)
self.workers = []
for _ in range(num_workers):
worker = asyncio.ensure_future(self.worker(), loop=self.loop)
self.workers.append(worker)
async def worker(self):
while True:
future, task = await self.tasks.get()
if task is TERMINATOR:
break
result = await asyncio.wait_for(task, None, loop=self.loop)
future.set_result(result)
def submit(self, task):
future = asyncio.Future(loop=self.loop)
self.tasks.put_nowait((future, task))
return future
async def join(self):
for _ in self.workers:
self.tasks.put_nowait((None, TERMINATOR))
await asyncio.gather(*self.workers, loop=self.loop)
async def do_stuff():
pass
pool = TaskPool(asyncio.get_event_loop(), 4)
futures = [pool.submit(do_stuff()) for _ in range(10)]
await pool.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment