Skip to content

Instantly share code, notes, and snippets.

@youqingkui
Created September 8, 2016 03:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save youqingkui/b5029e3709fdbfdc9706709ce8ca61cf to your computer and use it in GitHub Desktop.
Save youqingkui/b5029e3709fdbfdc9706709ce8ca61cf to your computer and use it in GitHub Desktop.
#encoding=utf-8
'''
演示如何多进程的使用gevent,
1、gevent和multiprocessing组合使用会有很多问题,
所以多进程直接用subprocess.Popen,进程间不通过fork共享
任何数据,完全独立运行,并通过socket通信
2、进程间同步不能用multiprocessing.Event,
因为wait()的时候会阻塞住线程,其它协程的代码无法执行,也
不能使用gevent.event.Event(),因为它通过multiprocessing.Process
共享到子进程后,在父进程set(),子进程wait()是不会收到信号的
3、子进程内不能通过signal.signal(signal.SIGINT, signal.SIG_IGN)
忽略ctrl+c,所以启动主进程时如果没设置后台运行,在ctrl+c时,主进程
和子进程都会中止而不能优雅退出
4、主进程和子进程的通信和同步使用gevent.socket来实现,子进程收到
主进程断开连接事件(接受到零字节数据)时,自己优雅退出,相当于主进程
发消息告诉子进程让子进程退出
5、主进程启动时直接在后台运行,使用"nohup gevent-multil-process.py &"来运行,
测试时可不用nohup命令,停止主进程时使用kill pid的方式,在主进程里
会拦截SIGTERM信号,通知并等待子进程退出
'''
import gevent
import gevent.socket as socket
from gevent.event import Event
import os
import sys
import subprocess
import signal
url = ('localhost', 8888)
class Worker(object):
'''
子进程运行的代码,通过起一个协程来和主进程通信
包括接受任务分配请求,退出信号(零字节包),及反馈任务执行进度
然后主协程等待停止信号并中止进程(stop_event用于协程间同步)。
'''
def __init__(self, url):
self.url = url
self.stop_event = Event()
gevent.spawn(self.communicate)
self.stop_event.wait()
print 'worker(%s):will stop' % os.getpid()
def exec_task(self, task):
print 'worker(%s):execute task:%s' % (os.getpid(), task.rstrip('\n'))
def communicate(self):
print 'worker(%s):started' % os.getpid()
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(self.url)
fp = client.makefile()
while True:
line = fp.readline()
if not line:
self.stop_event.set()
break
'单独起一个协程去执行任务,防止通信协程阻塞'
gevent.spawn(self.exec_task, line)
class Master():
'''
主进程运行代码,启动单独协程监听一个端口以供子进程连接和通信用,
通过subprocess.Popen启动CPU个数个子进程,注册SIGTERM信号以便在
KILL自己时通知子进程退出,主协程等待停止事件并退出主
'''
def __init__(self, url):
self.url = url
self.workers = []
self.stop_event = Event()
gevent.spawn(self.communicate)
gevent.sleep(0) #让communicate协程有机会执行,否则子进程会先启动
self.process = [subprocess.Popen(('python',sys.argv[0],'worker'))
for i in xrange(3)] #启动multiprocessing.cpucount-1个子进程
gevent.signal(signal.SIGTERM, self.stop) #拦截kill信号
gevent.spawn(self.test) #测试分发任务
self.stop_event.wait()
def communicate(self):
print 'master(%s):started' % os.getpid()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(url)
server.listen(1024)
while True:
worker, addr = server.accept()
print 'master(%s):new worker' % os.getpid()
self.workers.append(worker)
def stop(self):
print 'master stop'
for worker in self.workers:
worker.close()
for p in self.process:
p.wait()
self.stop_event.set()
def test(self):
import random
while True:
if not self.workers:
gevent.sleep(1)
continue
task = str(random.randint(100,10000))
worker = random.choice(self.workers)
worker.send(task)
worker.send('\n')
gevent.sleep(1)
if len(sys.argv) == 1:
Master(url)
else:
Worker(url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment