Created
October 18, 2023 14:27
-
-
Save jacopoabramo/395823c62ec1597714762d32016fcb26 to your computer and use it in GitHub Desktop.
ZeroMQ message broker within napari widgets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import napari | |
import zmq | |
import logging | |
import threading | |
from PyQt5.QtWidgets import QWidget, QPushButton, QVBoxLayout | |
from napari.qt import thread_worker | |
FRONTEND_ENDPOINT = "ipc://routing-front.ipc" | |
BACKEND_ENDPOINT = "ipc://routing-back.ipc" | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class MessageBroker(threading.Thread): | |
def __init__(self) -> None: | |
super().__init__() | |
# setup frontend | |
self.front: zmq.Socket = zmq.Context.instance().socket(zmq.DEALER) | |
self.front.bind(FRONTEND_ENDPOINT) | |
self.front.setsockopt_string(zmq.IDENTITY, 'MESSAGE_BROKER_FRONT') | |
# setup backend | |
self.back: zmq.Socket = zmq.Context.instance().socket(zmq.ROUTER) | |
self.back.bind(BACKEND_ENDPOINT) | |
self.back.setsockopt_string(zmq.IDENTITY, 'MESSAGE_BROKER_BACK') | |
def run(self): | |
logger.info("[BROKER] Starting loop") | |
while True: | |
msg = self.back.recv_multipart() | |
logger.info(F"[BROKER] received message: {[b.decode() for b in msg]}") | |
if msg[1] == b"ACTION": | |
self.front.send_multipart(msg) | |
class NapariProcessingWidget(QWidget): | |
def __init__(self, viewer: napari.Viewer, widget_name: str): | |
super().__init__() | |
self.viewer = viewer | |
self.widget_name = widget_name | |
self.front_socket : zmq.Socket = zmq.Context.instance().socket(zmq.ROUTER) | |
self.back_socket : zmq.Socket = zmq.Context.instance().socket(zmq.DEALER) | |
self.front_socket.setsockopt_string(zmq.IDENTITY, f"{self.widget_name}_front") | |
self.front_socket.connect(FRONTEND_ENDPOINT) | |
self.back_socket.setsockopt_string(zmq.IDENTITY, f"{self.widget_name}_back") | |
self.back_socket.connect(BACKEND_ENDPOINT) | |
self._register() | |
self.button = QPushButton("Click to send message") | |
self.button.clicked.connect(self.poll) | |
layout = QVBoxLayout() | |
layout.addWidget(self.button) | |
self.setLayout(layout) | |
self.listener = self.listen_front() | |
self.listener.start() | |
@thread_worker | |
def listen_front(self): | |
logger.info(f"[{self.widget_name}] Starting listener...") | |
while True: | |
frames = self.front_socket.recv_multipart() | |
logger.info(f"[{self.widget_name}] Received message: {[b.decode() for b in frames]}") | |
def poll(self): | |
self.back_socket.send_multipart([b"ACTION", f"Greetings from {self.widget_name}".encode()]) | |
def _register(self): | |
self.back_socket.send_multipart([b"REGISTER"]) | |
# run message broker in the background on a separate thread | |
message_broker = MessageBroker() | |
message_broker.setDaemon(True) | |
message_broker.start() | |
viewer = napari.Viewer() | |
test_widgets = [NapariProcessingWidget(viewer, widget_name=f"test_widget_{idx}") for idx in range(2) ] | |
for test_widget in test_widgets: | |
viewer.window.add_dock_widget(test_widget, area="right") | |
napari.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment