Skip to content

Instantly share code, notes, and snippets.

@mxh
Created December 12, 2017 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mxh/2a2e89c92fc47b24a1f2f011df058d12 to your computer and use it in GitHub Desktop.
Save mxh/2a2e89c92fc47b24a1f2f011df058d12 to your computer and use it in GitHub Desktop.
class Server(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.queue = []
def run(self):
self.context = zmq.Context()
self.frontend = self.context.socket(zmq.ROUTER)
self.frontend.bind('tcp://*:5570')
self.poller = zmq.Poller()
self.poller.register(self.frontend, zmq.POLLIN)
self.processor = Processor()
while True:
while len(self.queue) < 8:
sockets = dict(self.poller.poll())
# queue up messages
if self.frontend in sockets:
ident, zf, msg = self.frontend.recv_multipart()
tprint('Server received %s from %s' % (msg, ident))
self.queue.append([ident, zf, msg])
else:
break
# process queue
tprint('Processing batch of size %d' % len(self.queue))
result = self.processor.process(self.queue)
for message in result:
self.frontend.send_multipart(message)
self.queue = []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment