Skip to content

Instantly share code, notes, and snippets.

@svvitale
Created April 16, 2014 02:51
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save svvitale/e5bb806a754184455dc4 to your computer and use it in GitHub Desktop.
Save svvitale/e5bb806a754184455dc4 to your computer and use it in GitHub Desktop.
Mixin enabling any python class to be a ZMQ inproc listener
class _Singleton(type):
"""Singleton class from Stack Overflow:
http://stackoverflow.com/a/6798042
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Singleton(_Singleton('SingletonMeta', (object,), {})): pass
import unittest2
import zmqextended
import zmq
import time
import threading
class InprocListenerMixinTest(unittest2.TestCase):
def test_class_method(self):
"""Test that the InprocListenerMixin has correctly implemented the inproc_addr class method.
"""
self.assertEquals(zmqextended.InprocListenerMixin.default_inproc_addr(), "inproc://InprocListenerMixin")
def test_mixin(self):
"""Test that we can create a class that listens just by deriving from the InprocListenerMixin.
"""
class INeedToListen(zmqextended.InprocListenerMixin, object):
def __init__(self):
super(INeedToListen, self).__init__(self.handler)
self.handled_requests = 0
def handler(self, options):
self.handled_requests += 1
return options
# Create an instance of our local class. It will start listening immediately.
test_instance = INeedToListen()
# Test to make sure we got a correct inproc address
self.assertTrue(test_instance.inproc_addr().startswith("inproc://INeedToListen"))
self.assertNotEqual(test_instance.inproc_addr(), "inproc://INeedToListen")
# Send a message to our local instance
test_zmqsend = zmqextended.ZMQSender(test_instance.inproc_addr())
test_zmqsend.send_sync({"expected": 1})
# Verify that the message was handled.
self.assertEqual(test_instance.handled_requests, 1)
import os
import zmq
import threading
from singleton import Singleton
class ZMQContext(zmq.Context, Singleton):
"""Singleton implementation of a ZMQ Context object.
"""
def __init__(self):
super(ZMQContext, self).__init__()
class ZMQThread(threading.Thread):
"""Abstract base class for ZMQ listener/sender threads. Handles management of the stop event, and enforces
start()/run() method overrides."""
DEFAULT_POLLING_INTERVAL_MS = 100
def __init__(self, addr):
# Call the base threading.Thread constructor and set up this thread as a daemon thread (so it will terminate
# on program exit).
super(ZMQThread, self).__init__()
self.daemon = True
# Set up a cancellation event
self._stop_event = threading.Event()
# Save off the address
self._addr = addr
# Initialize request/reply members
self._request = self._reply = None
self._handler = None
def start(self):
# Enforce that we have a run method defined in our child class.
if self.run.__func__ is ZMQThread.run.__func__:
raise NotImplementedError("%s requires a run() method" % (self.__class__.__name__,))
# Call the parent's method to start the thread.
super(ZMQThread, self).start()
def run(self):
raise NotImplementedError("%s requires a run() method" % (self.__class__.__name__,))
def stop(self):
# Set the cancellation event.
self._stop_event.set()
class ZMQListener(ZMQThread):
"""Convenience class for receiving python objects on a ZMQ endpoint."""
def __init__(self, addr, handler):
"""Construct a ZMQ listener.
addr -- ZMQ endpoint address on which we will be listening.
handler -- Callback to be invoked when a request is received. Must take one argument which
is the request as a python object. The handler's return value will be forwarded back to the sender.
"""
super(ZMQListener, self).__init__(addr)
# Make sure the caller specified a callable handler (function or method).
if not hasattr(handler, "__call__"):
raise ValueError("handler argument must be callable!")
self._handler = handler
def run(self):
"""Thread worker."""
# Instantiate the dealer socket
zmq_socket = ZMQContext().socket(zmq.DEALER)
try:
# Start listening on the specified address
zmq_socket.bind(self._addr)
# Continue polling until we've been told to stop
while not self._stop_event.is_set():
if zmq_socket.poll(ZMQThread.DEFAULT_POLLING_INTERVAL_MS) != 0:
# Ready to receive a packet
request = zmq_socket.recv_pyobj()
# Successfully got a message, call the handler
reply = self._handler(request)
# Send the response
zmq_socket.send_pyobj(reply)
finally:
# Make sure to close the socket no matter what
zmq_socket.close()
class ZMQSender(ZMQThread):
"""Convenience class for sending a python object to a ZMQ endpoint. Supports both synchronous/asynchronous sends."""
def send_sync(self, request):
"""Send a request synchronously. This method will block until a response is received.
request -- Any valid python object (serializable by pickle) that you wish to send.
returns -- Response as a python object.
"""
self._handler = None
self._request = request
self.run()
return self._reply
def send_async(self, request, handler):
"""Send a request asynchronously.
request -- Any valid python object (serializable by pickle) that you wish to send.
handler -- Callback to be invoked when a response is received. Must take one argument which
is the response as a python object.
"""
# Make sure the caller specified a callable handler (function or method).
if not hasattr(handler, "__call__"):
raise ValueError("Handler argument must be callable!")
self._handler = handler
self._request = request
self.start()
def run(self):
"""Thread worker."""
# Instantiate the dealer socket
zmq_socket = ZMQContext().socket(zmq.DEALER)
try:
zmq_socket.connect(self._addr)
# Send the request
zmq_socket.send_pyobj(self._request)
# Continue polling until we receive a response or we've been told to stop
while not self._stop_event.is_set():
if zmq_socket.poll(ZMQThread.DEFAULT_POLLING_INTERVAL_MS) != 0:
# Ready to receive a packet
self._reply = zmq_socket.recv_pyobj()
# Successfully got a message, call the handler and return
if self._handler is not None:
self._handler(self._reply)
return
finally:
# Make sure to close the socket no matter what
zmq_socket.close()
class InprocListenerMixin:
"""Derive from this mixin to add inproc ZMQ message handling to any python object."""
@classmethod
def default_inproc_addr(cls):
# Generate an inproc address based on our class name.
return "inproc://" + cls.__name__
def __init__(self, handler):
"""Initializes the ZMQ listener on the inproc socket, and assigns the message handler.
handler -- a callable taking one parameter which is the Python object being received.
The handler's return value will be forwarded on to the sender as the message response.
"""
self._inproc_addr = self.default_inproc_addr() + "-" + os.urandom(16)
self._listener = ZMQListener(self._inproc_addr, handler)
self._listener.start()
def inproc_addr(self):
return self._inproc_addr
def __del__(self):
"""Stop and join the listener thread if it's still running."""
if self._listener.is_alive():
self._listener.stop()
self._listener.join(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment