Skip to content

Instantly share code, notes, and snippets.

@jacopoabramo
Created October 18, 2023 14:27
Show Gist options
  • Save jacopoabramo/395823c62ec1597714762d32016fcb26 to your computer and use it in GitHub Desktop.
Save jacopoabramo/395823c62ec1597714762d32016fcb26 to your computer and use it in GitHub Desktop.
ZeroMQ message broker within napari widgets
import napari
import zmq
import logging
import threading
from PyQt5.QtWidgets import QWidget, QPushButton, QVBoxLayout
from napari.qt import thread_worker
FRONTEND_ENDPOINT = "ipc://routing-front.ipc"
BACKEND_ENDPOINT = "ipc://routing-back.ipc"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MessageBroker(threading.Thread):
def __init__(self) -> None:
super().__init__()
# setup frontend
self.front: zmq.Socket = zmq.Context.instance().socket(zmq.DEALER)
self.front.bind(FRONTEND_ENDPOINT)
self.front.setsockopt_string(zmq.IDENTITY, 'MESSAGE_BROKER_FRONT')
# setup backend
self.back: zmq.Socket = zmq.Context.instance().socket(zmq.ROUTER)
self.back.bind(BACKEND_ENDPOINT)
self.back.setsockopt_string(zmq.IDENTITY, 'MESSAGE_BROKER_BACK')
def run(self):
logger.info("[BROKER] Starting loop")
while True:
msg = self.back.recv_multipart()
logger.info(F"[BROKER] received message: {[b.decode() for b in msg]}")
if msg[1] == b"ACTION":
self.front.send_multipart(msg)
class NapariProcessingWidget(QWidget):
def __init__(self, viewer: napari.Viewer, widget_name: str):
super().__init__()
self.viewer = viewer
self.widget_name = widget_name
self.front_socket : zmq.Socket = zmq.Context.instance().socket(zmq.ROUTER)
self.back_socket : zmq.Socket = zmq.Context.instance().socket(zmq.DEALER)
self.front_socket.setsockopt_string(zmq.IDENTITY, f"{self.widget_name}_front")
self.front_socket.connect(FRONTEND_ENDPOINT)
self.back_socket.setsockopt_string(zmq.IDENTITY, f"{self.widget_name}_back")
self.back_socket.connect(BACKEND_ENDPOINT)
self._register()
self.button = QPushButton("Click to send message")
self.button.clicked.connect(self.poll)
layout = QVBoxLayout()
layout.addWidget(self.button)
self.setLayout(layout)
self.listener = self.listen_front()
self.listener.start()
@thread_worker
def listen_front(self):
logger.info(f"[{self.widget_name}] Starting listener...")
while True:
frames = self.front_socket.recv_multipart()
logger.info(f"[{self.widget_name}] Received message: {[b.decode() for b in frames]}")
def poll(self):
self.back_socket.send_multipart([b"ACTION", f"Greetings from {self.widget_name}".encode()])
def _register(self):
self.back_socket.send_multipart([b"REGISTER"])
# run message broker in the background on a separate thread
message_broker = MessageBroker()
message_broker.setDaemon(True)
message_broker.start()
viewer = napari.Viewer()
test_widgets = [NapariProcessingWidget(viewer, widget_name=f"test_widget_{idx}") for idx in range(2) ]
for test_widget in test_widgets:
viewer.window.add_dock_widget(test_widget, area="right")
napari.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment