Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Browse avahi services through DBUS using PyQt5 and QtDbus
#!/usr/bin/python3
import sys
import logging
import signal
from PyQt5.QtCore import QObject, QVariant, pyqtSlot, pyqtSignal
from PyQt5.QtWidgets import QApplication
from PyQt5.QtDBus import QDBus, QDBusConnection, QDBusInterface, QDBusMessage
from models import Server
logger = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal.SIG_DFL)
class Service(QObject):
def __init__(
self,
interface,
protocol,
name,
stype,
domain,
host=None,
aprotocol=None,
address=None,
port=None,
txt=None
):
super(Service, self).__init__()
self.interface = interface
self.protocol = protocol
self.name = name
self.stype = stype
self.domain = domain
self.host = host
self.aprotocol = aprotocol
self.address = address
self.port = port
self.txt = txt
def __str__(self):
return '{s.name} ({s.stype}.{s.domain})'.format(s=self)
def __eq__(self, other):
return self.__dict__ == other.__dict__
class Discoverer(QObject):
# Use those signals to get notified of changes in subscribed services
# Emitted when initial scanning of avahi services is done
initialized = pyqtSignal()
# Emitted when a new service for our watched type is found
added = pyqtSignal(Service)
removed = pyqtSignal(Service)
def __init__(self, parent, service, interface=-1, protocol=-1, domain='local'):
super(Discoverer, self).__init__(parent)
self.protocol = protocol
self.bus = QDBusConnection.systemBus()
self.bus.registerObject('/', self)
self.server = QDBusInterface(
'org.freedesktop.Avahi',
'/',
'org.freedesktop.Avahi.Server',
self.bus
)
flags = QVariant(0)
flags.convert(QVariant.UInt)
browser_path = self.server.call(
'ServiceBrowserNew',
interface,
self.protocol,
service,
domain,
flags
)
logger.debug('New ServiceBrowser: {}'.format(browser_path.arguments()))
self.bus.connect(
'org.freedesktop.Avahi',
browser_path.arguments()[0],
'org.freedesktop.Avahi.ServiceBrowser',
'ItemNew',
self.onItemNew
)
self.bus.connect(
'org.freedesktop.Avahi',
browser_path.arguments()[0],
'org.freedesktop.Avahi.ServiceBrowser',
'ItemRemove',
self.onItemRemove
)
self.bus.connect(
'org.freedesktop.Avahi',
browser_path.arguments()[0],
'org.freedesktop.Avahi.ServiceBrowser',
'AllForNow',
self.onAllForNow
)
@pyqtSlot(QDBusMessage)
def onItemNew(self, msg):
logger.debug('Avahi service discovered: {}'.format(msg.arguments()))
flags = QVariant(0)
flags.convert(QVariant.UInt)
resolved = self.server.callWithArgumentList(
QDBus.AutoDetect,
'ResolveService',
[
*msg.arguments()[:5],
self.protocol,
flags
]
).arguments()
logger.debug('Avahi service resolved: {}'.format(resolved))
service = Service(*resolved[:10])
self.added.emit(service)
@pyqtSlot(QDBusMessage)
def onItemRemove(self, msg):
arguments = msg.arguments()
logger.debug('Avahi service removed: {}'.format(arguments))
service = Service(*arguments[:5])
self.removed.emit(service)
@pyqtSlot(QDBusMessage)
def onAllForNow(self, msg):
logger.debug('Avahi emitted all signals for discovered peers')
self.initialized.emit()
# Main Function
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
# Create main app
app = QApplication(sys.argv)
# Create avahi discoverer
d = Discoverer(app, '_workstation._tcp')
# Execute the application and exit
sys.exit(app.exec_())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment