Skip to content

Instantly share code, notes, and snippets.

@jason-s
Last active August 29, 2015 14:07
Show Gist options
  • Save jason-s/40e5f9784c4e8f78fe58 to your computer and use it in GitHub Desktop.
Save jason-s/40e5f9784c4e8f78fe58 to your computer and use it in GitHub Desktop.
import zmq
import codecs
import logging
logger = logging.getLogger(__name__)
def rot13(s):
return codecs.encode(s,'rot_13')
class Rot13Server(object):
def __init__(self, port):
self._port = port
def run(self):
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
socket.bind('tcp://*:%d' % self._port)
self.socket = socket
self.launch(socket)
def launch(self, socket):
self.messageLoop(socket)
socket.close()
def messageLoop(self, socket):
logger.info('Starting...')
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
pollTime = 100 # 0.1 second
try:
while True:
sockets = dict(poller.poll(pollTime))
if socket in sockets:
msg = socket.recv_multipart()
reply = self.onRecv(msg)
if reply is not None:
socket.send_multipart(reply)
except KeyboardInterrupt:
pass
except:
logger.exception("Aborting!!!")
def onRecv(self, msg):
logger.info('Received message:\n%s',msg)
reply = [frame if k == 0 else rot13(frame) for (k,frame) in enumerate(msg)]
logger.info('Sending message:\n%s',reply)
return reply
def send(self, msg):
self.socket.send_multipart(msg)
if __name__ == '__main__':
import sys
logging.basicConfig(level=logging.INFO)
try:
port = int(sys.argv[1])
except:
port = 12345
server = Rot13Server(port)
server.run()
import zmq
from zmq.eventloop import ioloop, zmqstream
import threading
class IOBackgroundLoop(object):
def __init__(self):
self._loop = None
self._thread = threading.Thread(target=self.run)
self._thread.daemon = True
self._started = threading.Event()
@property
def loop(self):
return self._loop
def run(self):
self._loop = ioloop.IOLoop()
self._loop.initialize()
self._loop.make_current()
self._started.set()
self._loop.start()
def start(self):
self._thread.start()
self._started.wait()
def run(port=12345):
bkloop = IOBackgroundLoop()
bkloop.start()
loops = [(bkloop.loop, 'background'), (ioloop.IOLoop.instance(), 'main')]
for loop, loopname in loops:
print "%s %s running: %s" % (loopname, loop, loop._running)
import time
def print_timestamp(key):
print "%s: %s" % (time.time(), key)
def print_in_future(loop, message, dt):
def queuer():
loop.add_timeout(loop.time()+dt, lambda: print_timestamp(message))
loop.add_callback(queuer)
for loop, loopname in loops:
print_in_future(loop, "hi from %s" % loopname, 0.5)
print_timestamp("here")
time.sleep(1.0)
print_timestamp("there")
print_in_future(loop, "another hi from %s" % loopname, 0.5)
ctx = zmq.Context()
socket = ctx.socket(zmq.DEALER)
socket.connect('tcp://localhost:%d' % port)
socket.send("Hey! (sync read)")
print socket.recv()
def tada(msg):
print "got %s" % msg
def setup_stream():
print "setting up on_recv"
stream = zmqstream.ZMQStream(socket, io_loop=ioloop.IOLoop.current())
stream.on_recv(tada)
bkloop.loop.add_callback(setup_stream)
socket.send("Hey! (async read)")
time.sleep(10)
if __name__ == '__main__':
import sys
try:
port = sys.argv[1]
except:
port = 12345
run(port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment