Skip to content

Instantly share code, notes, and snippets.

@codysoyland
Created February 1, 2011 17:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codysoyland/806272 to your computer and use it in GitHub Desktop.
Save codysoyland/806272 to your computer and use it in GitHub Desktop.
import gevent
from gevent.server import StreamServer
from gevent.pool import Group
from gevent_zeromq import zmq
context = zmq.Context()
class CodependentGroup(Group):
"""
A greenlet group that will kill all greenlets if a single one dies.
"""
def discard(self, greenlet):
super(CodependentGroup, self).discard(greenlet)
if not hasattr(self, '_killing'):
self._killing = True
gevent.spawn(self.kill)
class SocketProxy(object):
def __init__(self, hostport, owner_spec, bind_spec='tcp://127.0.0.1'):
self.hostport = hostport
self.owner_spec = owner_spec
self.bind_spec = bind_spec
self.owner = context.socket(zmq.REQ)
self.owner.connect(self.owner_spec)
self.connections = []
def handler(self, sock, address):
sockfile = sock.makefile()
proxy = context.socket(zmq.PAIR)
port = proxy.bind_to_random_port(self.bind_spec)
self.connections.append(gevent.spawn(self.run_proxy, sockfile, proxy))
self.owner.send(str(port))
self.owner.recv()
def proxy_read(self, sockfile, zmq_sock):
while True:
data = zmq_sock.recv()
sockfile.write(data)
sockfile.flush()
def proxy_write(self, sockfile, zmq_sock):
while True:
data = sockfile.readline()
if not data:
return
zmq_sock.send(data)
def run_proxy(self, sockfile, zmq_sock):
group = CodependentGroup([
gevent.spawn(self.proxy_read, sockfile, zmq_sock),
gevent.spawn(self.proxy_write, sockfile, zmq_sock),
])
gevent.joinall(group.greenlets)
def serve_forever(self):
server = StreamServer(self.hostport, self.handler)
server.serve_forever()
class SocketService(object):
handler = None
def __init__(self, listen_spec, handler=None):
self.listen_spec = listen_spec
self.handler = handler or self.handler or (lambda:None)
self.setup()
def setup(self):
pass
def serve_forever(self):
listener = context.socket(zmq.REP)
listener.bind(self.listen_spec)
while True:
port = listener.recv()
listener.send('OK')
sock = context.socket(zmq.PAIR)
sock.connect('tcp://127.0.0.1:' + port)
gevent.spawn(self.handler, sock)
class ChatServer(SocketService):
pub_spec = 'inproc://chatpub'
def setup(self):
self.publisher()
self.subscribers = {}
def publisher(self):
self.pub = context.socket(zmq.PUB)
self.pub.bind(self.pub_spec)
def subscriber(self, client):
if not self.subscribers.has_key(client['name']):
sub = context.socket(zmq.SUB)
sub.connect(self.pub_spec)
sub.setsockopt(zmq.SUBSCRIBE, "")
sub.setsockopt(zmq.IDENTITY, client['name'])
self.subscribers[client['name']] = sub
return self.subscribers[client['name']]
def inpipe(self, client):
while True:
message = client['socket'].recv()
self.pub.send(client['name'] + ': ' + message)
def outpipe(self, client):
sub = self.subscriber(client)
while True:
message = sub.recv()
client['socket'].send(message)
def handler(self, conn):
conn.send('Please enter your name: ')
name = conn.recv().strip()
conn.send('Welcome back to the chat server, %s.\r\n' % name)
client = {'name': name, 'socket': conn}
gevent.spawn(self.inpipe, client)
gevent.spawn(self.outpipe, client)
if __name__ == '__main__':
proxy = SocketProxy(('0.0.0.0', 8080), 'tcp://localhost:9009')
server = ChatServer('tcp://*:9009')
greenlets = [
gevent.spawn(proxy.serve_forever),
gevent.spawn(server.serve_forever),
]
gevent.joinall(greenlets)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment