-
-
Save papr/1f32d08202d5c130e15aee9509963b70 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import msgpack | |
import zmq | |
logger = logging.getLogger(__name__) | |
def main(ip_address, port): | |
ctx = zmq.Context() | |
requester = ctx.socket(zmq.REQ) | |
req_url = f"tcp://{ip_address}:{port}" | |
logger.debug(f"Connecting to {req_url=}") | |
requester.connect(req_url) | |
requester.send_string("SUB_PORT") | |
ipc_sub_port = requester.recv_string() | |
sub_url = f"tcp://{ip_address}:{ipc_sub_port}" | |
logger.debug(f"Connecting to {sub_url=}") | |
monitor = Msg_Receiver(ctx, sub_url, topics=("notify.",)) | |
while True: | |
try: | |
_, msg = monitor.recv() | |
logger.info(msg) | |
except KeyboardInterrupt: | |
break | |
class Msg_Receiver: | |
def __init__(self, ctx, url, topics=(), hwm=None): | |
self.socket = zmq.Socket(ctx, zmq.SUB) | |
assert type(topics) != str | |
if hwm is not None: | |
self.socket.set_hwm(hwm) | |
self.socket.connect(url) | |
for t in topics: | |
self.subscribe(t) | |
def subscribe(self, topic): | |
self.socket.subscribe(topic) | |
def unsubscribe(self, topic): | |
self.socket.unsubscribe(topic) | |
def recv(self): | |
"""Recv a message with topic, payload. | |
Topic is a utf-8 encoded string. Returned as unicode object. | |
Payload is a msgpack serialized dict. Returned as a python dict. | |
Any addional message frames will be added as a list | |
in the payload dict with key: '__raw_data__' . | |
""" | |
topic = self.recv_topic() | |
remaining_frames = self.recv_remaining_frames() | |
payload = self.deserialize_payload(*remaining_frames) | |
return topic, payload | |
def recv_topic(self): | |
return self.socket.recv_string() | |
def recv_remaining_frames(self): | |
while self.socket.get(zmq.RCVMORE): | |
yield self.socket.recv() | |
def deserialize_payload(self, payload_serialized, *extra_frames): | |
payload = msgpack.unpackb(payload_serialized) | |
if extra_frames: | |
payload["__raw_data__"] = extra_frames | |
return payload | |
@property | |
def new_data(self): | |
return self.socket.get(zmq.EVENTS) & zmq.POLLIN | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-ip", "--ip_address", type=str, default="127.0.0.1") | |
parser.add_argument("-p", "--port", type=int, default=50020) | |
args = parser.parse_args() | |
handlers = [ | |
logging.StreamHandler(), | |
logging.FileHandler( | |
f"monitor_notifications_{args.ip_address}_{args.port}.log", mode="w" | |
), | |
] | |
for handler in handlers: | |
handler.setFormatter( | |
logging.Formatter( | |
"%(asctime)s - %(processName)s - [%(levelname)s] %(name)s: %(message)s" | |
) | |
) | |
logging.basicConfig(level=logging.DEBUG, handlers=handlers) | |
main(args.ip_address, args.port) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment