Created
July 14, 2016 10:43
-
-
Save fladi/8bebdba4c47051afa7cad46e3ead6763 to your computer and use it in GitHub Desktop.
Browse avahi services through DBUS using PyQt5 and QtDbus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import sys | |
import logging | |
import signal | |
from PyQt5.QtCore import QObject, QVariant, pyqtSlot, pyqtSignal | |
from PyQt5.QtWidgets import QApplication | |
from PyQt5.QtDBus import QDBus, QDBusConnection, QDBusInterface, QDBusMessage | |
from models import Server | |
logger = logging.getLogger(__name__) | |
signal.signal(signal.SIGINT, signal.SIG_DFL) | |
class Service(QObject): | |
def __init__( | |
self, | |
interface, | |
protocol, | |
name, | |
stype, | |
domain, | |
host=None, | |
aprotocol=None, | |
address=None, | |
port=None, | |
txt=None | |
): | |
super(Service, self).__init__() | |
self.interface = interface | |
self.protocol = protocol | |
self.name = name | |
self.stype = stype | |
self.domain = domain | |
self.host = host | |
self.aprotocol = aprotocol | |
self.address = address | |
self.port = port | |
self.txt = txt | |
def __str__(self): | |
return '{s.name} ({s.stype}.{s.domain})'.format(s=self) | |
def __eq__(self, other): | |
return self.__dict__ == other.__dict__ | |
class Discoverer(QObject): | |
# Use those signals to get notified of changes in subscribed services | |
# Emitted when initial scanning of avahi services is done | |
initialized = pyqtSignal() | |
# Emitted when a new service for our watched type is found | |
added = pyqtSignal(Service) | |
removed = pyqtSignal(Service) | |
def __init__(self, parent, service, interface=-1, protocol=-1, domain='local'): | |
super(Discoverer, self).__init__(parent) | |
self.protocol = protocol | |
self.bus = QDBusConnection.systemBus() | |
self.bus.registerObject('/', self) | |
self.server = QDBusInterface( | |
'org.freedesktop.Avahi', | |
'/', | |
'org.freedesktop.Avahi.Server', | |
self.bus | |
) | |
flags = QVariant(0) | |
flags.convert(QVariant.UInt) | |
browser_path = self.server.call( | |
'ServiceBrowserNew', | |
interface, | |
self.protocol, | |
service, | |
domain, | |
flags | |
) | |
logger.debug('New ServiceBrowser: {}'.format(browser_path.arguments())) | |
self.bus.connect( | |
'org.freedesktop.Avahi', | |
browser_path.arguments()[0], | |
'org.freedesktop.Avahi.ServiceBrowser', | |
'ItemNew', | |
self.onItemNew | |
) | |
self.bus.connect( | |
'org.freedesktop.Avahi', | |
browser_path.arguments()[0], | |
'org.freedesktop.Avahi.ServiceBrowser', | |
'ItemRemove', | |
self.onItemRemove | |
) | |
self.bus.connect( | |
'org.freedesktop.Avahi', | |
browser_path.arguments()[0], | |
'org.freedesktop.Avahi.ServiceBrowser', | |
'AllForNow', | |
self.onAllForNow | |
) | |
@pyqtSlot(QDBusMessage) | |
def onItemNew(self, msg): | |
logger.debug('Avahi service discovered: {}'.format(msg.arguments())) | |
flags = QVariant(0) | |
flags.convert(QVariant.UInt) | |
resolved = self.server.callWithArgumentList( | |
QDBus.AutoDetect, | |
'ResolveService', | |
[ | |
*msg.arguments()[:5], | |
self.protocol, | |
flags | |
] | |
).arguments() | |
logger.debug('Avahi service resolved: {}'.format(resolved)) | |
service = Service(*resolved[:10]) | |
self.added.emit(service) | |
@pyqtSlot(QDBusMessage) | |
def onItemRemove(self, msg): | |
arguments = msg.arguments() | |
logger.debug('Avahi service removed: {}'.format(arguments)) | |
service = Service(*arguments[:5]) | |
self.removed.emit(service) | |
@pyqtSlot(QDBusMessage) | |
def onAllForNow(self, msg): | |
logger.debug('Avahi emitted all signals for discovered peers') | |
self.initialized.emit() | |
# Main Function | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.DEBUG) | |
# Create main app | |
app = QApplication(sys.argv) | |
# Create avahi discoverer | |
d = Discoverer(app, '_workstation._tcp') | |
# Execute the application and exit | |
sys.exit(app.exec_()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment