Skip to content

Instantly share code, notes, and snippets.

@Hacksore
Created October 6, 2018 16:04
Show Gist options
  • Save Hacksore/b8ba1b9f378be6f806eee6615790bd96 to your computer and use it in GitHub Desktop.
Save Hacksore/b8ba1b9f378be6f806eee6615790bd96 to your computer and use it in GitHub Desktop.
python bluetooth metadata with dbus
#!/usr/bin/env python
import time
import signal
import dbus
import dbus.service
import dbus.mainloop.glib
import gobject
import logging
import hashlib
import os
import json
SERVICE_NAME = "org.bluez"
AGENT_IFACE = SERVICE_NAME + '.Agent1'
ADAPTER_IFACE = SERVICE_NAME + ".Adapter1"
DEVICE_IFACE = SERVICE_NAME + ".Device1"
PLAYER_IFACE = SERVICE_NAME + '.MediaPlayer1'
TRANSPORT_IFACE = SERVICE_NAME + '.MediaTransport1'
LOG_LEVEL = logging.INFO
#LOG_LEVEL = logging.DEBUG
LOG_FILE = "/dev/stdout"
LOG_FORMAT = "%(asctime)s %(levelname)s %(message)s"
def getManagedObjects():
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
return manager.GetManagedObjects()
def findAdapter():
objects = getManagedObjects();
bus = dbus.SystemBus()
for path, ifaces in objects.iteritems():
adapter = ifaces.get(ADAPTER_IFACE)
if adapter is None:
continue
obj = bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, ADAPTER_IFACE)
raise Exception("Bluetooth adapter not found")
class BluePlayer(dbus.service.Object):
AGENT_PATH = "/blueplayer/agent"
CAPABILITY = "DisplayOnly"
bus = None
adapter = None
device = None
deviceAlias = None
player = None
transport = None
connected = None
state = None
status = None
discoverable = None
track = {
"artist": None,
"title": None,
"checksum": None
}
def __init__(self):
self.bus = dbus.SystemBus()
dbus.service.Object.__init__(self, dbus.SystemBus(), BluePlayer.AGENT_PATH)
self.bus.add_signal_receiver(self.playerHandler,
bus_name="org.bluez",
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword="path")
self.registerAgent()
self.findPlayer()
def start(self):
try:
mainloop = gobject.MainLoop()
mainloop.run()
except:
self.end()
def findPlayer(self):
"""Find any current media players and associated device"""
manager = dbus.Interface(self.bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects()
player_path = None
transport_path = None
for path, interfaces in objects.iteritems():
if PLAYER_IFACE in interfaces:
player_path = path
if TRANSPORT_IFACE in interfaces:
transport_path = path
if player_path:
self.connected = True
self.getPlayer(player_path)
player_properties = self.player.GetAll(PLAYER_IFACE, dbus_interface="org.freedesktop.DBus.Properties")
if "Status" in player_properties:
self.status = player_properties["Status"]
if "Track" in player_properties:
self.track = player_properties["Track"]
else:
logging.debug("Could not find player")
if transport_path:
self.transport = self.bus.get_object("org.bluez", transport_path)
transport_properties = self.transport.GetAll(TRANSPORT_IFACE, dbus_interface="org.freedesktop.DBus.Properties")
if "State" in transport_properties:
self.state = transport_properties["State"]
def getPlayer(self, path):
self.player = self.bus.get_object("org.bluez", path)
device_path = self.player.Get("org.bluez.MediaPlayer1", "Device", dbus_interface="org.freedesktop.DBus.Properties")
self.getDevice(device_path)
def getDevice(self, path):
self.device = self.bus.get_object("org.bluez", path)
self.deviceAlias = self.device.Get(DEVICE_IFACE, "Alias", dbus_interface="org.freedesktop.DBus.Properties")
def playerHandler(self, interface, changed, invalidated, path):
iface = interface[interface.rfind(".") + 1:]
if iface == "MediaPlayer1":
if "Track" in changed:
if("Title" in changed["Track"]):
self.onTrackChange(changed["Track"])
if "Status" in changed:
self.status = (changed["Status"])
def onTrackChange(self, changed):
m = hashlib.md5()
title = changed["Title"]
artist = changed["Artist"]
checksum = (title + "" + artist).replace(" ", "")
if(changed != None and checksum != self.track.get("checksum")):
self.track = {
"title": title + "",
"artist": artist + "",
"checksum": checksum
}
jsonString = json.dumps(self.track, indent=4)
trackFile = open("test.json", "wb")
trackFile.write(jsonString.encode("utf-8", "replace"))
trackFile.close()
#os.system("echo '" + json.dumps(self.track) + "' > test.json")
print(jsonString)
def next(self):
self.player.Next(dbus_interface=PLAYER_IFACE)
def previous(self):
self.player.Previous(dbus_interface=PLAYER_IFACE)
def play(self):
self.player.Play(dbus_interface=PLAYER_IFACE)
def pause(self):
self.player.Pause(dbus_interface=PLAYER_IFACE)
def volumeUp(self):
self.control.VolumeUp(dbus_interface=CONTROL_IFACE)
self.transport.VolumeUp(dbus_interface=TRANSPORT_IFACE)
def shutdown(self):
print(1)
def getStatus(self):
return self.status
def registerAgent(self):
"""Register BluePlayer as the default agent"""
manager = dbus.Interface(self.bus.get_object(SERVICE_NAME, "/org/bluez"), "org.bluez.AgentManager1")
manager.RegisterAgent(BluePlayer.AGENT_PATH, BluePlayer.CAPABILITY)
manager.RequestDefaultAgent(BluePlayer.AGENT_PATH)
logging.debug("Blueplayer is registered as a default agent")
logging.basicConfig(filename=LOG_FILE, format=LOG_FORMAT, level=LOG_LEVEL)
logging.info("Starting BluePlayer")
gobject.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
player = None
try:
player = BluePlayer()
mainloop = gobject.MainLoop()
mainloop.run()
except KeyboardInterrupt as ex:
logging.info("BluePlayer canceled by user")
except Exception as ex:
logging.error("How embarrassing. The following error occurred {}".format(ex))
finally:
if player:
player.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment