Skip to content

Instantly share code, notes, and snippets.

@papr
Created November 12, 2021 13:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save papr/1f32d08202d5c130e15aee9509963b70 to your computer and use it in GitHub Desktop.
Save papr/1f32d08202d5c130e15aee9509963b70 to your computer and use it in GitHub Desktop.
import argparse
import logging
import msgpack
import zmq
logger = logging.getLogger(__name__)
def main(ip_address, port):
ctx = zmq.Context()
requester = ctx.socket(zmq.REQ)
req_url = f"tcp://{ip_address}:{port}"
logger.debug(f"Connecting to {req_url=}")
requester.connect(req_url)
requester.send_string("SUB_PORT")
ipc_sub_port = requester.recv_string()
sub_url = f"tcp://{ip_address}:{ipc_sub_port}"
logger.debug(f"Connecting to {sub_url=}")
monitor = Msg_Receiver(ctx, sub_url, topics=("notify.",))
while True:
try:
_, msg = monitor.recv()
logger.info(msg)
except KeyboardInterrupt:
break
class Msg_Receiver:
def __init__(self, ctx, url, topics=(), hwm=None):
self.socket = zmq.Socket(ctx, zmq.SUB)
assert type(topics) != str
if hwm is not None:
self.socket.set_hwm(hwm)
self.socket.connect(url)
for t in topics:
self.subscribe(t)
def subscribe(self, topic):
self.socket.subscribe(topic)
def unsubscribe(self, topic):
self.socket.unsubscribe(topic)
def recv(self):
"""Recv a message with topic, payload.
Topic is a utf-8 encoded string. Returned as unicode object.
Payload is a msgpack serialized dict. Returned as a python dict.
Any addional message frames will be added as a list
in the payload dict with key: '__raw_data__' .
"""
topic = self.recv_topic()
remaining_frames = self.recv_remaining_frames()
payload = self.deserialize_payload(*remaining_frames)
return topic, payload
def recv_topic(self):
return self.socket.recv_string()
def recv_remaining_frames(self):
while self.socket.get(zmq.RCVMORE):
yield self.socket.recv()
def deserialize_payload(self, payload_serialized, *extra_frames):
payload = msgpack.unpackb(payload_serialized)
if extra_frames:
payload["__raw_data__"] = extra_frames
return payload
@property
def new_data(self):
return self.socket.get(zmq.EVENTS) & zmq.POLLIN
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-ip", "--ip_address", type=str, default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, default=50020)
args = parser.parse_args()
handlers = [
logging.StreamHandler(),
logging.FileHandler(
f"monitor_notifications_{args.ip_address}_{args.port}.log", mode="w"
),
]
for handler in handlers:
handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(processName)s - [%(levelname)s] %(name)s: %(message)s"
)
)
logging.basicConfig(level=logging.DEBUG, handlers=handlers)
main(args.ip_address, args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment