Skip to content

Instantly share code, notes, and snippets.

@ivanalejandro0
Last active July 5, 2021 03:04
Show Gist options
  • Save ivanalejandro0/dba1dd504edbc2c32e83 to your computer and use it in GitHub Desktop.
Save ivanalejandro0/dba1dd504edbc2c32e83 to your computer and use it in GitHub Desktop.
PyQt5 & ZMQ integration
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import signal
import sys
import uuid
import zmq
from PyQt5.QtWidgets import QApplication, QWidget
from PyQt5.QtWidgets import QPlainTextEdit, QPushButton
from PyQt5.QtWidgets import QGridLayout
from PyQt5.QtCore import QSocketNotifier
ENDPOINT = "ipc://routing.ipc"
class Client():
def __init__(self):
context = zmq.Context.instance()
client = context.socket(zmq.DEALER)
client.setsockopt(zmq.IDENTITY, b'QtClient')
client.connect(ENDPOINT)
self.socket = client
def dispatch(self, msg):
msg = bytes(msg, 'utf-8')
uid = uuid.uuid4().bytes
self.socket.send_multipart([uid, msg])
return uid
def recv(self):
return self.socket.recv_multipart()
class Example(QWidget):
def __init__(self):
super().__init__()
self.initUI()
self._client = Client()
socket = self._client.socket
self._notifier = QSocketNotifier(socket.getsockopt(zmq.FD),
QSocketNotifier.Read, self)
self._notifier.activated.connect(self._socket_activity)
self._counter = 0
def initUI(self):
self.qpteText = QPlainTextEdit(self)
btn = QPushButton("Send something")
btn.clicked.connect(self._send_data)
layout = QGridLayout()
layout.addWidget(self.qpteText, 1, 1)
layout.addWidget(btn, 2, 1)
self.setLayout(layout)
self.setGeometry(200, 200, 480, 400)
self.setWindowTitle('QT & ZMQ integration test')
self.show()
self._log('[UI] started')
def _log(self, data):
text = self.qpteText.toPlainText()
self.qpteText.setPlainText(text + data + '\n')
def _send_data(self):
msg = "Test message #" + str(self._counter)
self._client.dispatch(msg)
self._log("[UI] sent: " + msg)
self._counter += 1
def _socket_activity(self):
self._notifier.setEnabled(False)
flags = self._client.socket.getsockopt(zmq.EVENTS)
self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags))
if flags & zmq.POLLIN:
received = self._client.recv()
self._log("[Socket] zmq.POLLIN")
self._log("[Socket] received: " + repr(received))
elif flags & zmq.POLLOUT:
self._log("[Socket] zmq.POLLOUT")
elif flags & zmq.POLLERR:
self._log("[Socket] zmq.POLLERR")
else:
self._log("[Socket] FAILURE")
self._notifier.setEnabled(True)
# I have no idea why I need this here, but it won't work more than once
# if this is not used
flags = self._client.socket.getsockopt(zmq.EVENTS)
self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags))
if __name__ == '__main__':
app = QApplication(sys.argv)
w = Example()
# Ensure that the application quits using CTRL-C
signal.signal(signal.SIGINT, signal.SIG_DFL)
sys.exit(app.exec_())
#!/bin/bash
if [[ -z $1 ]]; then
echo "You need to specify a python3 virtualenv path parameter."
echo "You should NOT be sourced inside that venv"
fi
libs=( PyQt5 sip.cpython-34m-x86_64-linux-gnu.so )
get_python_lib_cmd="from distutils.sysconfig import get_python_lib; print (get_python_lib())"
lib_system_path=$(python3 -c "$get_python_lib_cmd")
source $1/bin/activate
lib_virtualenv_path=$(python3 -c "$get_python_lib_cmd")
for lib in ${libs[@]}
do
ln -s $lib_system_path/$lib $lib_virtualenv_path/$lib
don
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import zmq
ENDPOINT = "ipc://routing.ipc"
def main():
context = zmq.Context.instance()
worker = context.socket(zmq.ROUTER)
worker.bind(ENDPOINT)
while True:
request = worker.recv_multipart()
print("[Worker] received request: ", request)
client_id, msg_id, msg = request
if random.choice([True, True, False]): # should I respond?
response = msg + b'(received)'
worker.send_multipart([client_id, msg_id, response])
print("[Worker] response sent: ", [client_id, msg_id, response])
if msg == b"END":
break
if __name__ == "__main__":
main()
@ivanalejandro0
Copy link
Author

Steps to get this working:

  • Install PyQt5 system wide
  • Create a python3 virtualenv
  • use the pyqt5-to-virtualenv.sh script to symlink the global pyqt into the venv
  • install, inside the virtualenv, pyzmq
  • run in a terminal the worker.py
  • in a different terminal run the example.py UI

@jboulton
Copy link

jboulton commented Jul 5, 2021

In your _socket_activity method you need to put the recv in a while loop as there could be more than one message in the queue something like:

    def _socket_activity(self):
        self._notifier.setEnabled(False)

        flags = self._client.socket.getsockopt(zmq.EVENTS)
        self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags))
        
        while flags:
            if flags & zmq.POLLIN:
                received = self._client.recv()
                self._log("[Socket] zmq.POLLIN")
                self._log("[Socket] received: " + repr(received))
            elif flags & zmq.POLLOUT:
                self._log("[Socket] zmq.POLLOUT")
            elif flags & zmq.POLLERR:
                self._log("[Socket] zmq.POLLERR")
            else:
                self._log("[Socket] FAILURE")
            self._notifier.setEnabled(True)

            # Check for more messages
            flags = self._client.socket.getsockopt(zmq.EVENTS)
            self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment