Last active
August 29, 2015 14:07
-
-
Save jason-s/40e5f9784c4e8f78fe58 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import codecs | |
import logging | |
logger = logging.getLogger(__name__) | |
def rot13(s): | |
return codecs.encode(s,'rot_13') | |
class Rot13Server(object): | |
def __init__(self, port): | |
self._port = port | |
def run(self): | |
context = zmq.Context() | |
socket = context.socket(zmq.ROUTER) | |
socket.bind('tcp://*:%d' % self._port) | |
self.socket = socket | |
self.launch(socket) | |
def launch(self, socket): | |
self.messageLoop(socket) | |
socket.close() | |
def messageLoop(self, socket): | |
logger.info('Starting...') | |
poller = zmq.Poller() | |
poller.register(socket, zmq.POLLIN) | |
pollTime = 100 # 0.1 second | |
try: | |
while True: | |
sockets = dict(poller.poll(pollTime)) | |
if socket in sockets: | |
msg = socket.recv_multipart() | |
reply = self.onRecv(msg) | |
if reply is not None: | |
socket.send_multipart(reply) | |
except KeyboardInterrupt: | |
pass | |
except: | |
logger.exception("Aborting!!!") | |
def onRecv(self, msg): | |
logger.info('Received message:\n%s',msg) | |
reply = [frame if k == 0 else rot13(frame) for (k,frame) in enumerate(msg)] | |
logger.info('Sending message:\n%s',reply) | |
return reply | |
def send(self, msg): | |
self.socket.send_multipart(msg) | |
if __name__ == '__main__': | |
import sys | |
logging.basicConfig(level=logging.INFO) | |
try: | |
port = int(sys.argv[1]) | |
except: | |
port = 12345 | |
server = Rot13Server(port) | |
server.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
from zmq.eventloop import ioloop, zmqstream | |
import threading | |
class IOBackgroundLoop(object): | |
def __init__(self): | |
self._loop = None | |
self._thread = threading.Thread(target=self.run) | |
self._thread.daemon = True | |
self._started = threading.Event() | |
@property | |
def loop(self): | |
return self._loop | |
def run(self): | |
self._loop = ioloop.IOLoop() | |
self._loop.initialize() | |
self._loop.make_current() | |
self._started.set() | |
self._loop.start() | |
def start(self): | |
self._thread.start() | |
self._started.wait() | |
def run(port=12345): | |
bkloop = IOBackgroundLoop() | |
bkloop.start() | |
loops = [(bkloop.loop, 'background'), (ioloop.IOLoop.instance(), 'main')] | |
for loop, loopname in loops: | |
print "%s %s running: %s" % (loopname, loop, loop._running) | |
import time | |
def print_timestamp(key): | |
print "%s: %s" % (time.time(), key) | |
def print_in_future(loop, message, dt): | |
def queuer(): | |
loop.add_timeout(loop.time()+dt, lambda: print_timestamp(message)) | |
loop.add_callback(queuer) | |
for loop, loopname in loops: | |
print_in_future(loop, "hi from %s" % loopname, 0.5) | |
print_timestamp("here") | |
time.sleep(1.0) | |
print_timestamp("there") | |
print_in_future(loop, "another hi from %s" % loopname, 0.5) | |
ctx = zmq.Context() | |
socket = ctx.socket(zmq.DEALER) | |
socket.connect('tcp://localhost:%d' % port) | |
socket.send("Hey! (sync read)") | |
print socket.recv() | |
def tada(msg): | |
print "got %s" % msg | |
def setup_stream(): | |
print "setting up on_recv" | |
stream = zmqstream.ZMQStream(socket, io_loop=ioloop.IOLoop.current()) | |
stream.on_recv(tada) | |
bkloop.loop.add_callback(setup_stream) | |
socket.send("Hey! (async read)") | |
time.sleep(10) | |
if __name__ == '__main__': | |
import sys | |
try: | |
port = sys.argv[1] | |
except: | |
port = 12345 | |
run(port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment