Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@flyer103
Created December 6, 2013 08:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flyer103/7820284 to your computer and use it in GitHub Desktop.
Save flyer103/7820284 to your computer and use it in GitHub Desktop.
再次测试使用 gevent.event.Event
#!/urs/bin/env python2.7
#coding: utf-8
"""Test the usage of 'gevent.event.Event' class.
"""
import random
import gevent
from gevent.event import Event
class TestEvent(object):
def __init__(self):
self.event = Event()
def run(self):
producers = [gevent.spawn(self._producer, i) for i in xrange(3)]
consumers = [gevent.spawn(self._consumer, i) for i in xrange(3)]
tasks = []
tasks.extend(producers)
tasks.extend(consumers)
gevent.joinall(tasks)
def _producer(self, pid):
print("I'm producer %d and now I don't want consume to do something" % (pid,))
self.event.clear()
sleeptime = random.randint(0, 5) * 0.01
print("Sleeping time is %f" % (sleeptime, ))
gevent.sleep(sleeptime)
print("I'm producer %d and now consumer could do something." % (pid,))
self.event.set()
def _consumer(self, pid):
print("I'm consumer %d and now I'm waiting for producer" % (pid,))
gevent.sleep(random.randint(0, 10) * 0.1)
flag = self.event.wait()
print("I'm consumer %d. Flag is %r and now I can do something" % (pid, flag))
self.event.clear()
if __name__ == '__main__':
test = TestEvent()
test.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment