Created
October 6, 2018 16:04
-
-
Save Hacksore/b8ba1b9f378be6f806eee6615790bd96 to your computer and use it in GitHub Desktop.
python bluetooth metadata with dbus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import time | |
import signal | |
import dbus | |
import dbus.service | |
import dbus.mainloop.glib | |
import gobject | |
import logging | |
import hashlib | |
import os | |
import json | |
SERVICE_NAME = "org.bluez" | |
AGENT_IFACE = SERVICE_NAME + '.Agent1' | |
ADAPTER_IFACE = SERVICE_NAME + ".Adapter1" | |
DEVICE_IFACE = SERVICE_NAME + ".Device1" | |
PLAYER_IFACE = SERVICE_NAME + '.MediaPlayer1' | |
TRANSPORT_IFACE = SERVICE_NAME + '.MediaTransport1' | |
LOG_LEVEL = logging.INFO | |
#LOG_LEVEL = logging.DEBUG | |
LOG_FILE = "/dev/stdout" | |
LOG_FORMAT = "%(asctime)s %(levelname)s %(message)s" | |
def getManagedObjects(): | |
bus = dbus.SystemBus() | |
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") | |
return manager.GetManagedObjects() | |
def findAdapter(): | |
objects = getManagedObjects(); | |
bus = dbus.SystemBus() | |
for path, ifaces in objects.iteritems(): | |
adapter = ifaces.get(ADAPTER_IFACE) | |
if adapter is None: | |
continue | |
obj = bus.get_object(SERVICE_NAME, path) | |
return dbus.Interface(obj, ADAPTER_IFACE) | |
raise Exception("Bluetooth adapter not found") | |
class BluePlayer(dbus.service.Object): | |
AGENT_PATH = "/blueplayer/agent" | |
CAPABILITY = "DisplayOnly" | |
bus = None | |
adapter = None | |
device = None | |
deviceAlias = None | |
player = None | |
transport = None | |
connected = None | |
state = None | |
status = None | |
discoverable = None | |
track = { | |
"artist": None, | |
"title": None, | |
"checksum": None | |
} | |
def __init__(self): | |
self.bus = dbus.SystemBus() | |
dbus.service.Object.__init__(self, dbus.SystemBus(), BluePlayer.AGENT_PATH) | |
self.bus.add_signal_receiver(self.playerHandler, | |
bus_name="org.bluez", | |
dbus_interface="org.freedesktop.DBus.Properties", | |
signal_name="PropertiesChanged", | |
path_keyword="path") | |
self.registerAgent() | |
self.findPlayer() | |
def start(self): | |
try: | |
mainloop = gobject.MainLoop() | |
mainloop.run() | |
except: | |
self.end() | |
def findPlayer(self): | |
"""Find any current media players and associated device""" | |
manager = dbus.Interface(self.bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") | |
objects = manager.GetManagedObjects() | |
player_path = None | |
transport_path = None | |
for path, interfaces in objects.iteritems(): | |
if PLAYER_IFACE in interfaces: | |
player_path = path | |
if TRANSPORT_IFACE in interfaces: | |
transport_path = path | |
if player_path: | |
self.connected = True | |
self.getPlayer(player_path) | |
player_properties = self.player.GetAll(PLAYER_IFACE, dbus_interface="org.freedesktop.DBus.Properties") | |
if "Status" in player_properties: | |
self.status = player_properties["Status"] | |
if "Track" in player_properties: | |
self.track = player_properties["Track"] | |
else: | |
logging.debug("Could not find player") | |
if transport_path: | |
self.transport = self.bus.get_object("org.bluez", transport_path) | |
transport_properties = self.transport.GetAll(TRANSPORT_IFACE, dbus_interface="org.freedesktop.DBus.Properties") | |
if "State" in transport_properties: | |
self.state = transport_properties["State"] | |
def getPlayer(self, path): | |
self.player = self.bus.get_object("org.bluez", path) | |
device_path = self.player.Get("org.bluez.MediaPlayer1", "Device", dbus_interface="org.freedesktop.DBus.Properties") | |
self.getDevice(device_path) | |
def getDevice(self, path): | |
self.device = self.bus.get_object("org.bluez", path) | |
self.deviceAlias = self.device.Get(DEVICE_IFACE, "Alias", dbus_interface="org.freedesktop.DBus.Properties") | |
def playerHandler(self, interface, changed, invalidated, path): | |
iface = interface[interface.rfind(".") + 1:] | |
if iface == "MediaPlayer1": | |
if "Track" in changed: | |
if("Title" in changed["Track"]): | |
self.onTrackChange(changed["Track"]) | |
if "Status" in changed: | |
self.status = (changed["Status"]) | |
def onTrackChange(self, changed): | |
m = hashlib.md5() | |
title = changed["Title"] | |
artist = changed["Artist"] | |
checksum = (title + "" + artist).replace(" ", "") | |
if(changed != None and checksum != self.track.get("checksum")): | |
self.track = { | |
"title": title + "", | |
"artist": artist + "", | |
"checksum": checksum | |
} | |
jsonString = json.dumps(self.track, indent=4) | |
trackFile = open("test.json", "wb") | |
trackFile.write(jsonString.encode("utf-8", "replace")) | |
trackFile.close() | |
#os.system("echo '" + json.dumps(self.track) + "' > test.json") | |
print(jsonString) | |
def next(self): | |
self.player.Next(dbus_interface=PLAYER_IFACE) | |
def previous(self): | |
self.player.Previous(dbus_interface=PLAYER_IFACE) | |
def play(self): | |
self.player.Play(dbus_interface=PLAYER_IFACE) | |
def pause(self): | |
self.player.Pause(dbus_interface=PLAYER_IFACE) | |
def volumeUp(self): | |
self.control.VolumeUp(dbus_interface=CONTROL_IFACE) | |
self.transport.VolumeUp(dbus_interface=TRANSPORT_IFACE) | |
def shutdown(self): | |
print(1) | |
def getStatus(self): | |
return self.status | |
def registerAgent(self): | |
"""Register BluePlayer as the default agent""" | |
manager = dbus.Interface(self.bus.get_object(SERVICE_NAME, "/org/bluez"), "org.bluez.AgentManager1") | |
manager.RegisterAgent(BluePlayer.AGENT_PATH, BluePlayer.CAPABILITY) | |
manager.RequestDefaultAgent(BluePlayer.AGENT_PATH) | |
logging.debug("Blueplayer is registered as a default agent") | |
logging.basicConfig(filename=LOG_FILE, format=LOG_FORMAT, level=LOG_LEVEL) | |
logging.info("Starting BluePlayer") | |
gobject.threads_init() | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
player = None | |
try: | |
player = BluePlayer() | |
mainloop = gobject.MainLoop() | |
mainloop.run() | |
except KeyboardInterrupt as ex: | |
logging.info("BluePlayer canceled by user") | |
except Exception as ex: | |
logging.error("How embarrassing. The following error occurred {}".format(ex)) | |
finally: | |
if player: | |
player.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment