Last active
July 5, 2021 03:04
-
-
Save ivanalejandro0/dba1dd504edbc2c32e83 to your computer and use it in GitHub Desktop.
PyQt5 & ZMQ integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import signal | |
import sys | |
import uuid | |
import zmq | |
from PyQt5.QtWidgets import QApplication, QWidget | |
from PyQt5.QtWidgets import QPlainTextEdit, QPushButton | |
from PyQt5.QtWidgets import QGridLayout | |
from PyQt5.QtCore import QSocketNotifier | |
ENDPOINT = "ipc://routing.ipc" | |
class Client(): | |
def __init__(self): | |
context = zmq.Context.instance() | |
client = context.socket(zmq.DEALER) | |
client.setsockopt(zmq.IDENTITY, b'QtClient') | |
client.connect(ENDPOINT) | |
self.socket = client | |
def dispatch(self, msg): | |
msg = bytes(msg, 'utf-8') | |
uid = uuid.uuid4().bytes | |
self.socket.send_multipart([uid, msg]) | |
return uid | |
def recv(self): | |
return self.socket.recv_multipart() | |
class Example(QWidget): | |
def __init__(self): | |
super().__init__() | |
self.initUI() | |
self._client = Client() | |
socket = self._client.socket | |
self._notifier = QSocketNotifier(socket.getsockopt(zmq.FD), | |
QSocketNotifier.Read, self) | |
self._notifier.activated.connect(self._socket_activity) | |
self._counter = 0 | |
def initUI(self): | |
self.qpteText = QPlainTextEdit(self) | |
btn = QPushButton("Send something") | |
btn.clicked.connect(self._send_data) | |
layout = QGridLayout() | |
layout.addWidget(self.qpteText, 1, 1) | |
layout.addWidget(btn, 2, 1) | |
self.setLayout(layout) | |
self.setGeometry(200, 200, 480, 400) | |
self.setWindowTitle('QT & ZMQ integration test') | |
self.show() | |
self._log('[UI] started') | |
def _log(self, data): | |
text = self.qpteText.toPlainText() | |
self.qpteText.setPlainText(text + data + '\n') | |
def _send_data(self): | |
msg = "Test message #" + str(self._counter) | |
self._client.dispatch(msg) | |
self._log("[UI] sent: " + msg) | |
self._counter += 1 | |
def _socket_activity(self): | |
self._notifier.setEnabled(False) | |
flags = self._client.socket.getsockopt(zmq.EVENTS) | |
self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags)) | |
if flags & zmq.POLLIN: | |
received = self._client.recv() | |
self._log("[Socket] zmq.POLLIN") | |
self._log("[Socket] received: " + repr(received)) | |
elif flags & zmq.POLLOUT: | |
self._log("[Socket] zmq.POLLOUT") | |
elif flags & zmq.POLLERR: | |
self._log("[Socket] zmq.POLLERR") | |
else: | |
self._log("[Socket] FAILURE") | |
self._notifier.setEnabled(True) | |
# I have no idea why I need this here, but it won't work more than once | |
# if this is not used | |
flags = self._client.socket.getsockopt(zmq.EVENTS) | |
self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags)) | |
if __name__ == '__main__': | |
app = QApplication(sys.argv) | |
w = Example() | |
# Ensure that the application quits using CTRL-C | |
signal.signal(signal.SIGINT, signal.SIG_DFL) | |
sys.exit(app.exec_()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
if [[ -z $1 ]]; then | |
echo "You need to specify a python3 virtualenv path parameter." | |
echo "You should NOT be sourced inside that venv" | |
fi | |
libs=( PyQt5 sip.cpython-34m-x86_64-linux-gnu.so ) | |
get_python_lib_cmd="from distutils.sysconfig import get_python_lib; print (get_python_lib())" | |
lib_system_path=$(python3 -c "$get_python_lib_cmd") | |
source $1/bin/activate | |
lib_virtualenv_path=$(python3 -c "$get_python_lib_cmd") | |
for lib in ${libs[@]} | |
do | |
ln -s $lib_system_path/$lib $lib_virtualenv_path/$lib | |
don |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import random | |
import zmq | |
ENDPOINT = "ipc://routing.ipc" | |
def main(): | |
context = zmq.Context.instance() | |
worker = context.socket(zmq.ROUTER) | |
worker.bind(ENDPOINT) | |
while True: | |
request = worker.recv_multipart() | |
print("[Worker] received request: ", request) | |
client_id, msg_id, msg = request | |
if random.choice([True, True, False]): # should I respond? | |
response = msg + b'(received)' | |
worker.send_multipart([client_id, msg_id, response]) | |
print("[Worker] response sent: ", [client_id, msg_id, response]) | |
if msg == b"END": | |
break | |
if __name__ == "__main__": | |
main() |
In your _socket_activity method you need to put the recv in a while loop as there could be more than one message in the queue something like:
def _socket_activity(self):
self._notifier.setEnabled(False)
flags = self._client.socket.getsockopt(zmq.EVENTS)
self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags))
while flags:
if flags & zmq.POLLIN:
received = self._client.recv()
self._log("[Socket] zmq.POLLIN")
self._log("[Socket] received: " + repr(received))
elif flags & zmq.POLLOUT:
self._log("[Socket] zmq.POLLOUT")
elif flags & zmq.POLLERR:
self._log("[Socket] zmq.POLLERR")
else:
self._log("[Socket] FAILURE")
self._notifier.setEnabled(True)
# Check for more messages
flags = self._client.socket.getsockopt(zmq.EVENTS)
self._log("[Socket] socket.getsockopt(zmq.EVENTS): " + repr(flags))
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Steps to get this working:
pyqt5-to-virtualenv.sh
script to symlink the global pyqt into the venvpyzmq
worker.py
example.py
UI