Created
September 8, 2016 03:04
-
-
Save youqingkui/b5029e3709fdbfdc9706709ce8ca61cf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#encoding=utf-8 | |
''' | |
演示如何多进程的使用gevent, | |
1、gevent和multiprocessing组合使用会有很多问题, | |
所以多进程直接用subprocess.Popen,进程间不通过fork共享 | |
任何数据,完全独立运行,并通过socket通信 | |
2、进程间同步不能用multiprocessing.Event, | |
因为wait()的时候会阻塞住线程,其它协程的代码无法执行,也 | |
不能使用gevent.event.Event(),因为它通过multiprocessing.Process | |
共享到子进程后,在父进程set(),子进程wait()是不会收到信号的 | |
3、子进程内不能通过signal.signal(signal.SIGINT, signal.SIG_IGN) | |
忽略ctrl+c,所以启动主进程时如果没设置后台运行,在ctrl+c时,主进程 | |
和子进程都会中止而不能优雅退出 | |
4、主进程和子进程的通信和同步使用gevent.socket来实现,子进程收到 | |
主进程断开连接事件(接受到零字节数据)时,自己优雅退出,相当于主进程 | |
发消息告诉子进程让子进程退出 | |
5、主进程启动时直接在后台运行,使用"nohup gevent-multil-process.py &"来运行, | |
测试时可不用nohup命令,停止主进程时使用kill pid的方式,在主进程里 | |
会拦截SIGTERM信号,通知并等待子进程退出 | |
''' | |
import gevent | |
import gevent.socket as socket | |
from gevent.event import Event | |
import os | |
import sys | |
import subprocess | |
import signal | |
url = ('localhost', 8888) | |
class Worker(object): | |
''' | |
子进程运行的代码,通过起一个协程来和主进程通信 | |
包括接受任务分配请求,退出信号(零字节包),及反馈任务执行进度 | |
然后主协程等待停止信号并中止进程(stop_event用于协程间同步)。 | |
''' | |
def __init__(self, url): | |
self.url = url | |
self.stop_event = Event() | |
gevent.spawn(self.communicate) | |
self.stop_event.wait() | |
print 'worker(%s):will stop' % os.getpid() | |
def exec_task(self, task): | |
print 'worker(%s):execute task:%s' % (os.getpid(), task.rstrip('\n')) | |
def communicate(self): | |
print 'worker(%s):started' % os.getpid() | |
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
client.connect(self.url) | |
fp = client.makefile() | |
while True: | |
line = fp.readline() | |
if not line: | |
self.stop_event.set() | |
break | |
'单独起一个协程去执行任务,防止通信协程阻塞' | |
gevent.spawn(self.exec_task, line) | |
class Master(): | |
''' | |
主进程运行代码,启动单独协程监听一个端口以供子进程连接和通信用, | |
通过subprocess.Popen启动CPU个数个子进程,注册SIGTERM信号以便在 | |
KILL自己时通知子进程退出,主协程等待停止事件并退出主 | |
''' | |
def __init__(self, url): | |
self.url = url | |
self.workers = [] | |
self.stop_event = Event() | |
gevent.spawn(self.communicate) | |
gevent.sleep(0) #让communicate协程有机会执行,否则子进程会先启动 | |
self.process = [subprocess.Popen(('python',sys.argv[0],'worker')) | |
for i in xrange(3)] #启动multiprocessing.cpucount-1个子进程 | |
gevent.signal(signal.SIGTERM, self.stop) #拦截kill信号 | |
gevent.spawn(self.test) #测试分发任务 | |
self.stop_event.wait() | |
def communicate(self): | |
print 'master(%s):started' % os.getpid() | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.bind(url) | |
server.listen(1024) | |
while True: | |
worker, addr = server.accept() | |
print 'master(%s):new worker' % os.getpid() | |
self.workers.append(worker) | |
def stop(self): | |
print 'master stop' | |
for worker in self.workers: | |
worker.close() | |
for p in self.process: | |
p.wait() | |
self.stop_event.set() | |
def test(self): | |
import random | |
while True: | |
if not self.workers: | |
gevent.sleep(1) | |
continue | |
task = str(random.randint(100,10000)) | |
worker = random.choice(self.workers) | |
worker.send(task) | |
worker.send('\n') | |
gevent.sleep(1) | |
if len(sys.argv) == 1: | |
Master(url) | |
else: | |
Worker(url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment