Skip to content

Instantly share code, notes, and snippets.

@rickardp
Created May 15, 2014 21:34
Show Gist options
  • Save rickardp/2da1bf478de759cb2e15 to your computer and use it in GitHub Desktop.
Save rickardp/2da1bf478de759cb2e15 to your computer and use it in GitHub Desktop.
pika QtConnection -- async RabbitMQ/Pika with QT run loop
"""Use pika with the QT socket notifier
To use:
import qt_connection
instead of e.g. pika.SelectConnection(...)
use qt_connection.QtConnection(appObject, ...)
"""
import logging
import time
from pika.adapters import base_connection
from PySide.QtCore import QSocketNotifier, QTimer
LOGGER = logging.getLogger(__name__)
# Use epoll's constants to keep life easy
READ = 0x0001
WRITE = 0x0004
ERROR = 0x0008
class QtConnection(base_connection.BaseConnection):
WARN_ABOUT_IOLOOP = False
def __init__(self, parent, parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
stop_ioloop_on_close=False,
custom_ioloop=None):
"""Create a new instance of the QtConnection class, connecting
to RabbitMQ automatically
:param pika.connection.Parameters parameters: Connection parameters
:param on_open_callback: The method to call when the connection is open
:type on_open_callback: method
:param on_open_error_callback: Method to call if the connection cant
be opened
:type on_open_error_callback: method
:param bool stop_ioloop_on_close: Call ioloop.stop() if disconnected
:param custom_ioloop: Override using the global IOLoop in Tornado
"""
self.sleep_counter = 0
self.ioloop = IOLoop(parent)
super(QtConnection, self).__init__(parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
self.ioloop,
stop_ioloop_on_close)
def _adapter_connect(self):
"""Connect to the remote socket, adding the socket to the IOLoop if
connected
:rtype: bool
"""
if super(QtConnection, self)._adapter_connect():
self.ioloop.create_notifier(self.socket.fileno(),
self._handle_events,
self.event_state)
return True
return False
class IOLoop(object):
def __init__(self, parent):
self.parent = parent
self.read_notifier = self.write_notifier = self.exception_notifier = None
def create_notifier(self, socketno, handler, event_state):
self.update_handler(socketno, event_state)
self.read_notifier = QSocketNotifier(socketno, QSocketNotifier.Read, parent=self.parent)
self.write_notifier = QSocketNotifier(socketno, QSocketNotifier.Write, parent=self.parent)
self.exception_notifier = QSocketNotifier(socketno, QSocketNotifier.Exception, parent=self.parent)
self.read_notifier.activated.connect(lambda:handler(socketno, READ))
self.write_notifier.activated.connect(lambda:handler(socketno, WRITE))
self.exception_notifier.activated.connect(lambda:handler(socketno, ERROR))
def update_handler(self, socketno, event_state):
"""Pass in the events to process for the given file descriptor.
:param int socketno: The file descriptor to poll for
:param int events: The events to handle
"""
if self.read_notifier:
self.read_notifier.setEnabled(bool(event_state & READ))
if self.write_notifier:
self.write_notifier.setEnabled(bool(event_state & WRITE))
if self.exception_notifier:
self.exception_notifier.setEnabled(bool(event_state & ERROR))
def add_timeout(self, deadline, callback_method):
tim = QTimer(self.parent)
tim.setInterval(int(deadline*1000))
tim.setSingleShot(True)
tim.timeout.connect(callback_method)
tim.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment