Skip to content

Instantly share code, notes, and snippets.

@linusmotu linusmotu/base.py
Created Jul 11, 2019

Embed
What would you like to do?
Demonstration of using pub-sub with ZMQstream base classes.
import multiprocessing
from zmq.eventloop import ioloop, zmqstream
from zmq.utils import jsonapi as json
import zmq
decode = lambda x: x.decode('utf-8')
encode = lambda x: x.encode('ascii')
class ZmqProcess(multiprocessing.Process):
"""
This is the base for all processes and offers utility functions
for setup and creating new streams.
"""
def __init__(self):
super().__init__()
self.context = None
"""The ØMQ :class:`~zmq.Context` instance."""
self.loop = None
"""PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`)."""
def setup(self):
"""
Creates a :attr:`context` and an event :attr:`loop` for the process.
"""
self.context = zmq.Context()
self.loop = ioloop.IOLoop.instance()
def stream(self, sock_type, addr, bind, callback=None, subscribe=b''):
"""
Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`.
:param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``)
:param addr: Address to bind or connect to formatted as *host:port*,
*(host, port)* or *host* (bind to random port).
If *bind* is ``True``, *host* may be:
- the wild-card ``*``, meaning all available interfaces,
- the primary IPv4 address assigned to the interface, in its
numeric representation or
- the interface name as defined by the operating system.
If *bind* is ``False``, *host* may be:
- the DNS name of the peer or
- the IPv4 address of the peer, in its numeric representation.
If *addr* is just a host name without a port and *bind* is
``True``, the socket will be bound to a random port.
:param bind: Binds to *addr* if ``True`` or tries to connect to it
otherwise.
:param callback: A callback for
:meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional
:param subscribe: Subscription pattern for *SUB* sockets, optional,
defaults to ``b''``.
:returns: A tuple containg the stream and the port number.
"""
sock = self.context.socket(sock_type)
# addr may be 'host:port' or ('host', port)
if isinstance(addr, str):
addr = addr.split(':')
host, port = addr if len(addr) == 2 else (addr[0], None)
# Bind/connect the socket
if bind:
if port:
sock.bind('tcp://%s:%s' % (host, port))
else:
port = sock.bind_to_random_port('tcp://%s' % host)
else:
sock.connect('tcp://%s:%s' % (host, port))
# Add a default subscription for SUB sockets
if sock_type == zmq.SUB:
sock.setsockopt(zmq.SUBSCRIBE, subscribe)
# Create the stream and add the callback
stream = zmqstream.ZMQStream(sock, self.loop)
if callback:
stream.on_recv(callback)
return stream, int(port)
class MessageHandler(object):
"""
Base class for message handlers for a :class:`ZMQProcess`.
Inheriting classes only need to implement a handler function for each
message type.
"""
def __init__(self, json_load=-1):
print("MessageHandler init()")
self._json_load = json_load
def __call__(self, msg):
print("MessageHandler:{}".format(msg))
"""
Gets called when a messages is received by the stream this handlers is
registered at. *msg* is a list as return by
:meth:`zmq.core.socket.Socket.recv_multipart`.
"""
# Try to JSON-decode the index "self._json_load" of the message
i = self._json_load
msg_type, data = json.loads(msg[i])
print("MH: {}, {}".format(msg_type, data))
msg[i] = data
print("Msg_Type: ", type(msg_type))
# Get the actual message handler and call it
if msg_type.startswith('_'):
raise AttributeError('%s starts with an "_"' % msg_type)
getattr(self, msg_type)(*msg)
class RouterMessageHandler(object):
def __init__(self, json_load=-1):
self._json_load = json_load
def __call__(self, msg):
"""
Gets called when a messages is received by the stream this handlers is
registered at. *msg* is a list as return by
:meth:`zmq.core.socket.Socket.recv_multipart`.
"""
i = self._json_load
msg_type = decode(msg[i])
# Rest of array is the message itself
del msg[i]
# Get the actual message handler and call it
if msg_type.startswith('_'):
raise AttributeError('%s starts with an "_"' % msg_type)
getattr(self, msg_type)(*msg)
import zmq
import base
from multiprocessing import Process
import random
from zmq.eventloop import ioloop, zmqstream
from zmq.utils import jsonapi as json
import time
from string import ascii_uppercase as uppercase
host = '127.0.0.1'
port = 5558
def server_pub(port="5558"):
context = zmq.Context()
socket = context.socket(zmq.XPUB)
socket.bind("tcp://*:%s" % port)
print("Running server on port: ", port)
time.sleep(1)
for _ in range(10):
# Wait for next request from client
string = "%s-%05d" % (uppercase[random.randint(0,3)], random.randint(0,100000))
# You can send it in the format described in above [topic-data]
# socket.send(string.encode('utf-8'))
# As part of a multipart
# string = "A-baby"
# print("Publishing: {}".format(string))
# socket.send_multipart([string.encode('utf-8'), b"TEST"])
# Or with the topic listed as the first part of the multipart
print("[A, YES]")
socket.send_multipart([b"A-", b"YES"])
socket.send_multipart([b"status", b"something", b"YES"])
# You can also add payloads
time.sleep(0.1)
def ping():
"""Sends ping requests and waits for replies."""
context = zmq.Context()
sock = context.socket(zmq.REQ)
sock.connect('tcp://%s:%s' % (host, port))
for i in range(5):
sock.send_json(['ping', i])
rep = sock.recv_json()
print('Ping got reply:', rep)
sock.send_json(['plzdiekthxbye', None])
class SubProc(base.ZmqProcess):
"""
Main processes for the Ponger. It handles ping requests and sends back
a pong.
"""
def __init__(self, bind_addr):
super().__init__()
self.bind_addr = bind_addr
self.rep_streamA = None
self.rep_streamB = None
def callback(self, msg):
print("Processing ... %s" % msg)
def setup(self):
"""Sets up PyZMQ and creates all streams."""
super().setup()
# Create the stream and add the message handler
self.rep_streamA, _ = self.stream(zmq.SUB, self.bind_addr, bind=False, subscribe=b"A-")
self.rep_streamB, _ = self.stream(zmq.SUB, self.bind_addr, bind=False, subscribe=b"status")
self.rep_streamA.on_recv(self.callback)
self.rep_streamB.on_recv(SubStreamHandler(self.stop))
def run(self):
"""Sets up everything and starts the event loop."""
self.setup()
self.loop.start()
def stop(self):
"""Stops the event loop."""
self.loop.stop()
class SubStreamHandler(base.RouterMessageHandler):
"""Handels messages arrvinge at the PongProc’s REP stream."""
def __init__(self, stop):
super().__init__(json_load=0)
self._stop = stop
def status(self, *data):
print("Received: {}".format(data))
print("Data:", data[0], data[1])
def plzdiekthxbye(self, data):
"""Just calls :meth:`PongProc.stop`."""
self._stop()
class PongProc(base.ZmqProcess):
"""
Main processes for the Ponger. It handles ping requests and sends back
a pong.
"""
def __init__(self, bind_addr):
super().__init__()
self.bind_addr = bind_addr
self.rep_stream = None
self.ping_handler = PingHandler()
def setup(self):
"""Sets up PyZMQ and creates all streams."""
super().setup()
# Create the stream and add the message handler
self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True)
self.rep_stream.on_recv(RepStreamHandler(self.rep_stream, self.stop,
self.ping_handler))
def run(self):
"""Sets up everything and starts the event loop."""
self.setup()
self.loop.start()
def stop(self):
"""Stops the event loop."""
self.loop.stop()
class RepStreamHandler(base.MessageHandler):
"""Handels messages arrvinge at the PongProc’s REP stream."""
def __init__(self, rep_stream, stop, ping_handler):
super().__init__()
self._rep_stream = rep_stream
self._stop = stop
self._ping_handler = ping_handler
def ping(self, data):
"""Send back a pong."""
rep = self._ping_handler.make_pong(data)
self._rep_stream.send_json(rep)
def plzdiekthxbye(self, data):
"""Just calls :meth:`PongProc.stop`."""
self._stop()
class PingHandler(object):
def make_pong(self, num_pings):
"""Creates and returns a pong message."""
print('Pong got request number %s' % num_pings)
return ['pong', num_pings]
if __name__ == '__main__':
# pong_proc = PongProc(bind_addr=(host, port))
# pong_proc.start()
sub_proc = SubProc(bind_addr=("localhost", port))
sub_proc.start()
server_pub()
# ping()
# time.sleep(3)
# pong_proc.join()
sub_proc.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.