Skip to content

Instantly share code, notes, and snippets.

@jtallieu
Created September 8, 2017 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtallieu/2c89a3512c951b93962825a8068e0d5c to your computer and use it in GitHub Desktop.
Save jtallieu/2c89a3512c951b93962825a8068e0d5c to your computer and use it in GitHub Desktop.
Backing a Python generator with a pool of gevent greenlets that provide async data
"""
Gevent example to illustrate using AsyncResults from a greenlet
I wanted to launch a pool or workers that would generate data
that is provided through an iterator.
"""
import random
import time
import gevent
import gevent.pool
import gevent.event
from gevent import monkey
monkey.patch_all()
manager_working = gevent.event.Event()
def data_provider(id):
"""Worker that provides some data after some period of time"""
time.sleep(random.randint(3, 10))
return "Data from {}".format(id)
def lifeguard(results):
"""Manages a pool of workers that gather data"""
pool = gevent.pool.Pool(5)
# Signal the data_generator to start waiting on results from workers
manager_working.set()
# Start 20 workers 5 at a time
for x in range(0, 20):
# Put a list of async results in the list
async = gevent.event.AsyncResult()
results.append(async)
# Link the results of the greenlet to the AsyncResults
pool.spawn(data_provider, x).link(async)
# Exit when all the data providers have sent results
pool.join()
def data_generator():
"""Iterator that will yield data from a pool of workers"""
results = []
manager = gevent.spawn(lifeguard, results)
done = False
manager_working.wait()
while not done:
ready_results = gevent.wait(results, count=1, timeout=1)
for res in ready_results:
results.remove(res)
yield res.get()
try:
manager.get(block=False)
done = True
except gevent.Timeout:
pass
def main():
for data in data_generator():
print "---->", data
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment