Created
February 1, 2011 17:56
-
-
Save codysoyland/806272 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent | |
from gevent.server import StreamServer | |
from gevent.pool import Group | |
from gevent_zeromq import zmq | |
context = zmq.Context() | |
class CodependentGroup(Group): | |
""" | |
A greenlet group that will kill all greenlets if a single one dies. | |
""" | |
def discard(self, greenlet): | |
super(CodependentGroup, self).discard(greenlet) | |
if not hasattr(self, '_killing'): | |
self._killing = True | |
gevent.spawn(self.kill) | |
class SocketProxy(object): | |
def __init__(self, hostport, owner_spec, bind_spec='tcp://127.0.0.1'): | |
self.hostport = hostport | |
self.owner_spec = owner_spec | |
self.bind_spec = bind_spec | |
self.owner = context.socket(zmq.REQ) | |
self.owner.connect(self.owner_spec) | |
self.connections = [] | |
def handler(self, sock, address): | |
sockfile = sock.makefile() | |
proxy = context.socket(zmq.PAIR) | |
port = proxy.bind_to_random_port(self.bind_spec) | |
self.connections.append(gevent.spawn(self.run_proxy, sockfile, proxy)) | |
self.owner.send(str(port)) | |
self.owner.recv() | |
def proxy_read(self, sockfile, zmq_sock): | |
while True: | |
data = zmq_sock.recv() | |
sockfile.write(data) | |
sockfile.flush() | |
def proxy_write(self, sockfile, zmq_sock): | |
while True: | |
data = sockfile.readline() | |
if not data: | |
return | |
zmq_sock.send(data) | |
def run_proxy(self, sockfile, zmq_sock): | |
group = CodependentGroup([ | |
gevent.spawn(self.proxy_read, sockfile, zmq_sock), | |
gevent.spawn(self.proxy_write, sockfile, zmq_sock), | |
]) | |
gevent.joinall(group.greenlets) | |
def serve_forever(self): | |
server = StreamServer(self.hostport, self.handler) | |
server.serve_forever() | |
class SocketService(object): | |
handler = None | |
def __init__(self, listen_spec, handler=None): | |
self.listen_spec = listen_spec | |
self.handler = handler or self.handler or (lambda:None) | |
self.setup() | |
def setup(self): | |
pass | |
def serve_forever(self): | |
listener = context.socket(zmq.REP) | |
listener.bind(self.listen_spec) | |
while True: | |
port = listener.recv() | |
listener.send('OK') | |
sock = context.socket(zmq.PAIR) | |
sock.connect('tcp://127.0.0.1:' + port) | |
gevent.spawn(self.handler, sock) | |
class ChatServer(SocketService): | |
pub_spec = 'inproc://chatpub' | |
def setup(self): | |
self.publisher() | |
self.subscribers = {} | |
def publisher(self): | |
self.pub = context.socket(zmq.PUB) | |
self.pub.bind(self.pub_spec) | |
def subscriber(self, client): | |
if not self.subscribers.has_key(client['name']): | |
sub = context.socket(zmq.SUB) | |
sub.connect(self.pub_spec) | |
sub.setsockopt(zmq.SUBSCRIBE, "") | |
sub.setsockopt(zmq.IDENTITY, client['name']) | |
self.subscribers[client['name']] = sub | |
return self.subscribers[client['name']] | |
def inpipe(self, client): | |
while True: | |
message = client['socket'].recv() | |
self.pub.send(client['name'] + ': ' + message) | |
def outpipe(self, client): | |
sub = self.subscriber(client) | |
while True: | |
message = sub.recv() | |
client['socket'].send(message) | |
def handler(self, conn): | |
conn.send('Please enter your name: ') | |
name = conn.recv().strip() | |
conn.send('Welcome back to the chat server, %s.\r\n' % name) | |
client = {'name': name, 'socket': conn} | |
gevent.spawn(self.inpipe, client) | |
gevent.spawn(self.outpipe, client) | |
if __name__ == '__main__': | |
proxy = SocketProxy(('0.0.0.0', 8080), 'tcp://localhost:9009') | |
server = ChatServer('tcp://*:9009') | |
greenlets = [ | |
gevent.spawn(proxy.serve_forever), | |
gevent.spawn(server.serve_forever), | |
] | |
gevent.joinall(greenlets) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment