Skip to content

Instantly share code, notes, and snippets.

@jminuscula
Created December 4, 2015 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jminuscula/dd572ef1dbcf1d49e6de to your computer and use it in GitHub Desktop.
Save jminuscula/dd572ef1dbcf1d49e6de to your computer and use it in GitHub Desktop.
import asyncio
import time
from concurrent import futures
class AsyncRunner:
executor = futures.ThreadPoolExecutor(max_workers=5)
def __init__(self):
# self.loop = asyncio.new_event_loop()
self.loop = asyncio.get_event_loop()
self.loop.set_default_executor(AsyncRunner.executor)
self.tasks = set()
self.resultsq = asyncio.Queue()
self.processed = asyncio.Future()
def schedule(self, coro):
task = self.loop.create_task(coro)
self.tasks.add(task)
@asyncio.coroutine
def _process(self):
for task in asyncio.as_completed(self.tasks):
result = yield from task
self.resultsq.put(result)
def results(self):
while not self.processed.done():
result = yield from self.resultsq.get()
yield result
def run(self):
self.loop.run_until_complete(self._process())
self.loop.close()
self.processed.set_result(True)
class Origin:
def __init__(self, idstr, ntasks):
self.runner = AsyncRunner()
for i in range(ntasks):
self.runner.schedule(self.process(idstr, i))
self.runner.run()
@asyncio.coroutine
def process(self, idstr, n):
print("{0} [{1}] Sleep".format(idstr, n))
yield from asyncio.sleep(n)
print("{0} [{1}] Wake".format(idstr, n))
return n
def get_sources(self):
for source in self.runner.results():
print("RES", source)
yield source
if __name__ == '__main__':
originA = Origin('A', 5)
# originB = Origin('B', 5)
for source in originA.get_sources():
print('A', source)
# for source in originB.get_sources():
# print('B', source)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment