Created
December 6, 2013 08:19
-
-
Save flyer103/7820284 to your computer and use it in GitHub Desktop.
再次测试使用 gevent.event.Event
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/urs/bin/env python2.7 | |
#coding: utf-8 | |
"""Test the usage of 'gevent.event.Event' class. | |
""" | |
import random | |
import gevent | |
from gevent.event import Event | |
class TestEvent(object): | |
def __init__(self): | |
self.event = Event() | |
def run(self): | |
producers = [gevent.spawn(self._producer, i) for i in xrange(3)] | |
consumers = [gevent.spawn(self._consumer, i) for i in xrange(3)] | |
tasks = [] | |
tasks.extend(producers) | |
tasks.extend(consumers) | |
gevent.joinall(tasks) | |
def _producer(self, pid): | |
print("I'm producer %d and now I don't want consume to do something" % (pid,)) | |
self.event.clear() | |
sleeptime = random.randint(0, 5) * 0.01 | |
print("Sleeping time is %f" % (sleeptime, )) | |
gevent.sleep(sleeptime) | |
print("I'm producer %d and now consumer could do something." % (pid,)) | |
self.event.set() | |
def _consumer(self, pid): | |
print("I'm consumer %d and now I'm waiting for producer" % (pid,)) | |
gevent.sleep(random.randint(0, 10) * 0.1) | |
flag = self.event.wait() | |
print("I'm consumer %d. Flag is %r and now I can do something" % (pid, flag)) | |
self.event.clear() | |
if __name__ == '__main__': | |
test = TestEvent() | |
test.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment