Created
July 11, 2019 07:45
-
-
Save jptalusan/449e0f2f2e3bc447946d7717b237fea1 to your computer and use it in GitHub Desktop.
Demonstration of using pub-sub with ZMQstream base classes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
from zmq.eventloop import ioloop, zmqstream | |
from zmq.utils import jsonapi as json | |
import zmq | |
decode = lambda x: x.decode('utf-8') | |
encode = lambda x: x.encode('ascii') | |
class ZmqProcess(multiprocessing.Process): | |
""" | |
This is the base for all processes and offers utility functions | |
for setup and creating new streams. | |
""" | |
def __init__(self): | |
super().__init__() | |
self.context = None | |
"""The ØMQ :class:`~zmq.Context` instance.""" | |
self.loop = None | |
"""PyZMQ's event loop (:class:`~zmq.eventloop.ioloop.IOLoop`).""" | |
def setup(self): | |
""" | |
Creates a :attr:`context` and an event :attr:`loop` for the process. | |
""" | |
self.context = zmq.Context() | |
self.loop = ioloop.IOLoop.instance() | |
def stream(self, sock_type, addr, bind, callback=None, subscribe=b''): | |
""" | |
Creates a :class:`~zmq.eventloop.zmqstream.ZMQStream`. | |
:param sock_type: The ØMQ socket type (e.g. ``zmq.REQ``) | |
:param addr: Address to bind or connect to formatted as *host:port*, | |
*(host, port)* or *host* (bind to random port). | |
If *bind* is ``True``, *host* may be: | |
- the wild-card ``*``, meaning all available interfaces, | |
- the primary IPv4 address assigned to the interface, in its | |
numeric representation or | |
- the interface name as defined by the operating system. | |
If *bind* is ``False``, *host* may be: | |
- the DNS name of the peer or | |
- the IPv4 address of the peer, in its numeric representation. | |
If *addr* is just a host name without a port and *bind* is | |
``True``, the socket will be bound to a random port. | |
:param bind: Binds to *addr* if ``True`` or tries to connect to it | |
otherwise. | |
:param callback: A callback for | |
:meth:`~zmq.eventloop.zmqstream.ZMQStream.on_recv`, optional | |
:param subscribe: Subscription pattern for *SUB* sockets, optional, | |
defaults to ``b''``. | |
:returns: A tuple containg the stream and the port number. | |
""" | |
sock = self.context.socket(sock_type) | |
# addr may be 'host:port' or ('host', port) | |
if isinstance(addr, str): | |
addr = addr.split(':') | |
host, port = addr if len(addr) == 2 else (addr[0], None) | |
# Bind/connect the socket | |
if bind: | |
if port: | |
sock.bind('tcp://%s:%s' % (host, port)) | |
else: | |
port = sock.bind_to_random_port('tcp://%s' % host) | |
else: | |
sock.connect('tcp://%s:%s' % (host, port)) | |
# Add a default subscription for SUB sockets | |
if sock_type == zmq.SUB: | |
sock.setsockopt(zmq.SUBSCRIBE, subscribe) | |
# Create the stream and add the callback | |
stream = zmqstream.ZMQStream(sock, self.loop) | |
if callback: | |
stream.on_recv(callback) | |
return stream, int(port) | |
class MessageHandler(object): | |
""" | |
Base class for message handlers for a :class:`ZMQProcess`. | |
Inheriting classes only need to implement a handler function for each | |
message type. | |
""" | |
def __init__(self, json_load=-1): | |
print("MessageHandler init()") | |
self._json_load = json_load | |
def __call__(self, msg): | |
print("MessageHandler:{}".format(msg)) | |
""" | |
Gets called when a messages is received by the stream this handlers is | |
registered at. *msg* is a list as return by | |
:meth:`zmq.core.socket.Socket.recv_multipart`. | |
""" | |
# Try to JSON-decode the index "self._json_load" of the message | |
i = self._json_load | |
msg_type, data = json.loads(msg[i]) | |
print("MH: {}, {}".format(msg_type, data)) | |
msg[i] = data | |
print("Msg_Type: ", type(msg_type)) | |
# Get the actual message handler and call it | |
if msg_type.startswith('_'): | |
raise AttributeError('%s starts with an "_"' % msg_type) | |
getattr(self, msg_type)(*msg) | |
class RouterMessageHandler(object): | |
def __init__(self, json_load=-1): | |
self._json_load = json_load | |
def __call__(self, msg): | |
""" | |
Gets called when a messages is received by the stream this handlers is | |
registered at. *msg* is a list as return by | |
:meth:`zmq.core.socket.Socket.recv_multipart`. | |
""" | |
i = self._json_load | |
msg_type = decode(msg[i]) | |
# Rest of array is the message itself | |
del msg[i] | |
# Get the actual message handler and call it | |
if msg_type.startswith('_'): | |
raise AttributeError('%s starts with an "_"' % msg_type) | |
getattr(self, msg_type)(*msg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import base | |
from multiprocessing import Process | |
import random | |
from zmq.eventloop import ioloop, zmqstream | |
from zmq.utils import jsonapi as json | |
import time | |
from string import ascii_uppercase as uppercase | |
host = '127.0.0.1' | |
port = 5558 | |
def server_pub(port="5558"): | |
context = zmq.Context() | |
socket = context.socket(zmq.XPUB) | |
socket.bind("tcp://*:%s" % port) | |
print("Running server on port: ", port) | |
time.sleep(1) | |
for _ in range(10): | |
# Wait for next request from client | |
string = "%s-%05d" % (uppercase[random.randint(0,3)], random.randint(0,100000)) | |
# You can send it in the format described in above [topic-data] | |
# socket.send(string.encode('utf-8')) | |
# As part of a multipart | |
# string = "A-baby" | |
# print("Publishing: {}".format(string)) | |
# socket.send_multipart([string.encode('utf-8'), b"TEST"]) | |
# Or with the topic listed as the first part of the multipart | |
print("[A, YES]") | |
socket.send_multipart([b"A-", b"YES"]) | |
socket.send_multipart([b"status", b"something", b"YES"]) | |
# You can also add payloads | |
time.sleep(0.1) | |
def ping(): | |
"""Sends ping requests and waits for replies.""" | |
context = zmq.Context() | |
sock = context.socket(zmq.REQ) | |
sock.connect('tcp://%s:%s' % (host, port)) | |
for i in range(5): | |
sock.send_json(['ping', i]) | |
rep = sock.recv_json() | |
print('Ping got reply:', rep) | |
sock.send_json(['plzdiekthxbye', None]) | |
class SubProc(base.ZmqProcess): | |
""" | |
Main processes for the Ponger. It handles ping requests and sends back | |
a pong. | |
""" | |
def __init__(self, bind_addr): | |
super().__init__() | |
self.bind_addr = bind_addr | |
self.rep_streamA = None | |
self.rep_streamB = None | |
def callback(self, msg): | |
print("Processing ... %s" % msg) | |
def setup(self): | |
"""Sets up PyZMQ and creates all streams.""" | |
super().setup() | |
# Create the stream and add the message handler | |
self.rep_streamA, _ = self.stream(zmq.SUB, self.bind_addr, bind=False, subscribe=b"A-") | |
self.rep_streamB, _ = self.stream(zmq.SUB, self.bind_addr, bind=False, subscribe=b"status") | |
self.rep_streamA.on_recv(self.callback) | |
self.rep_streamB.on_recv(SubStreamHandler(self.stop)) | |
def run(self): | |
"""Sets up everything and starts the event loop.""" | |
self.setup() | |
self.loop.start() | |
def stop(self): | |
"""Stops the event loop.""" | |
self.loop.stop() | |
class SubStreamHandler(base.RouterMessageHandler): | |
"""Handels messages arrvinge at the PongProc’s REP stream.""" | |
def __init__(self, stop): | |
super().__init__(json_load=0) | |
self._stop = stop | |
def status(self, *data): | |
print("Received: {}".format(data)) | |
print("Data:", data[0], data[1]) | |
def plzdiekthxbye(self, data): | |
"""Just calls :meth:`PongProc.stop`.""" | |
self._stop() | |
class PongProc(base.ZmqProcess): | |
""" | |
Main processes for the Ponger. It handles ping requests and sends back | |
a pong. | |
""" | |
def __init__(self, bind_addr): | |
super().__init__() | |
self.bind_addr = bind_addr | |
self.rep_stream = None | |
self.ping_handler = PingHandler() | |
def setup(self): | |
"""Sets up PyZMQ and creates all streams.""" | |
super().setup() | |
# Create the stream and add the message handler | |
self.rep_stream, _ = self.stream(zmq.REP, self.bind_addr, bind=True) | |
self.rep_stream.on_recv(RepStreamHandler(self.rep_stream, self.stop, | |
self.ping_handler)) | |
def run(self): | |
"""Sets up everything and starts the event loop.""" | |
self.setup() | |
self.loop.start() | |
def stop(self): | |
"""Stops the event loop.""" | |
self.loop.stop() | |
class RepStreamHandler(base.MessageHandler): | |
"""Handels messages arrvinge at the PongProc’s REP stream.""" | |
def __init__(self, rep_stream, stop, ping_handler): | |
super().__init__() | |
self._rep_stream = rep_stream | |
self._stop = stop | |
self._ping_handler = ping_handler | |
def ping(self, data): | |
"""Send back a pong.""" | |
rep = self._ping_handler.make_pong(data) | |
self._rep_stream.send_json(rep) | |
def plzdiekthxbye(self, data): | |
"""Just calls :meth:`PongProc.stop`.""" | |
self._stop() | |
class PingHandler(object): | |
def make_pong(self, num_pings): | |
"""Creates and returns a pong message.""" | |
print('Pong got request number %s' % num_pings) | |
return ['pong', num_pings] | |
if __name__ == '__main__': | |
# pong_proc = PongProc(bind_addr=(host, port)) | |
# pong_proc.start() | |
sub_proc = SubProc(bind_addr=("localhost", port)) | |
sub_proc.start() | |
server_pub() | |
# ping() | |
# time.sleep(3) | |
# pong_proc.join() | |
sub_proc.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment