Skip to content

Instantly share code, notes, and snippets.

@oleeander
Created January 8, 2015 11:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oleeander/9d5b94d43d22c1b71208 to your computer and use it in GitHub Desktop.
Save oleeander/9d5b94d43d22c1b71208 to your computer and use it in GitHub Desktop.
mpris2 faderstart + dpflib display
#!/usr/bin/env python2
import argparse
import time
from gi.repository import GObject
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import serial
import dpflib_c
from PIL import Image
import cairo
class DbusOwnerChangeWatch(object):
def __init__(self, serial_port='/dev/ttyUSB0'):
DBusGMainLoop(set_as_default=True)
self._serial_port = serial_port
self._bus = dbus.SessionBus()
self._proxy = self._bus.get_object('org.freedesktop.DBus',
'/org/freedesktop/DBus')
self._proxy.connect_to_signal('NameOwnerChanged', self._owner_changed)
def _owner_changed(self, name, new, old):
if len(new) == 0 and 'org.mpris.MediaPlayer2' in name:
if hasattr(self, '_faderstart'):
self._faderstart.stop()
self._faderstart = Faderstart(self._bus, name, self._serial_port)
elif len(old) == 0 and hasattr(self, '_faderstart'):
if name == self._faderstart.player_bus_name:
self._faderstart.stop()
delattr(self, '_faderstart')
class MprisInterface(object):
def __init__(self, dbus_sessionbus, player_bus_name):
self._bus = dbus_sessionbus
self._player_object = self._bus.get_object(player_bus_name, '/org/mpris/MediaPlayer2')
self._player_iface = dbus.Interface(self._player_object,
dbus_interface='org.mpris.MediaPlayer2.Player')
self._player_props = dbus.Interface(self._player_object,
dbus_interface='org.freedesktop.DBus.Properties')
def get_status(self):
return self._player_props.Get('org.mpris.MediaPlayer2.Player',
'PlaybackStatus')
def get_artist(self):
try:
return self._player_props.Get('org.mpris.MediaPlayer2.Player',
'Metadata').get(dbus.String(u'xesam:artist'))[0].encode('utf-8')
except (TypeError, AttributeError):
return '-'
def get_title(self):
try:
return self._player_props.Get('org.mpris.MediaPlayer2.Player',
'Metadata').get(dbus.String(u'xesam:title')).encode('utf-8')
except (TypeError, AttributeError):
return '-'
def get_length(self):
try:
return self._player_props.Get('org.mpris.MediaPlayer2.Player',
'Metadata').get(dbus.String(u'mpris:length')) / 1000000
except TypeError:
return 0
def get_position(self):
try:
return self._player_props.Get('org.mpris.MediaPlayer2.Player',
'Position') / 1000000
except TypeError:
return 0
def play(self):
self._player_iface.Play()
def playpause(self):
self._player_iface.PlayPause()
def pause(self):
self._player_iface.Pause()
class Faderstart(object):
def __init__(self, dbus_sessionbus, player_bus_name, serial_port):
self.player_bus_name = player_bus_name
self._serial_iface = serial.Serial(serial_port, 9600, timeout=1)
self._serial_iface.flushInput()
self._serial_watch = GObject.io_add_watch(self._serial_iface.fileno(), GObject.IO_IN, self._serial_in)
self._player = MprisInterface(dbus_sessionbus, player_bus_name)
self._dpf = Display()
self._dpf.setBacklight(21)
self._dpf_timeout = GObject.timeout_add(500, self._dpf.update, self._player)
def _serial_in(self, fd, condition):
command = str(self._serial_iface.readline()).strip()
# the knowledge of the current playback status is needed to be compatible with spotify. *sigh*
# http://www.reactiongifs.com/wp-content/uploads/2013/04/friendly-fire.gif
status = self._player.get_status()
if command == 'START' and status in ['Paused', 'Stopped']:
self._player.playpause()
elif command == 'STOP' and status == 'Playing':
self._player.pause()
return True
def stop(self):
GObject.source_remove(self._dpf_timeout)
GObject.source_remove(self._serial_watch)
self._serial_iface.close()
self._dpf.setBacklight(0)
self._dpf.close()
class Display(dpflib_c.DPFPearl_c):
def __init__(self, device='usb0', name='Pearl DPF'):
dpflib_c.DPFPearl_c.__init__(self, device, name)
def update(self, player, *kwargs):
artist = player.get_artist()
if len(artist) >= 20:
artist = artist[0:20-2]+'..'
title = player.get_title()
if len(title) >= 17:
title = title[0:17-2]+'..'
length = player.get_length()
position = player.get_position()
m, s = divmod(length - position, 60)
h, m = divmod(m, 60)
remaining_hms = (int(h), int(m), int(s))
m, s = divmod(length, 60)
h, m = divmod(m, 60)
length_hms = (int(h), int(m), int(s))
m, s = divmod(position, 60)
h, m = divmod(m, 60)
position_hms = (int(h), int(m), int(s))
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, 320, 240)
cr = cairo.Context(surface)
if length - position <= 10:
cr.set_source_rgb(1.0, 0.2, 0.0)
else:
cr.set_source_rgb(1.0, 1.0, 1.0)
cr.rectangle(0, 0, 320, 240)
cr.fill()
cr.select_font_face('Mono')
cr.set_font_size(30)
cr.set_source_rgb(0.00, 0.00, 0.00)
x_bearing, y_bearing, width, height = cr.text_extents(
'{:02d}:{:02d}:{:02d}'.format(*position_hms))[:4]
cr.move_to(80 - width / 2 - x_bearing, 35 - height / 2 - y_bearing)
cr.show_text('{:02d}:{:02d}:{:02d}'.format(*position_hms))
cr.stroke()
x_bearing, y_bearing, width, height = cr.text_extents(
'{:02d}:{:02d}:{:02d}'.format(*length_hms))[:4]
cr.move_to(240 - width / 2 - x_bearing, 35 - height / 2 - y_bearing)
cr.show_text('{:02d}:{:02d}:{:02d}'.format(*length_hms))
cr.stroke()
cr.set_font_size(50)
if length - position <= 10:
cr.set_source_rgb(1.00, 1.00, 1.00)
elif player.get_status() == 'Playing':
cr.set_source_rgb(0.00, 0.40, 0.00)
x_bearing, y_bearing, width, height = cr.text_extents(
'-{:02d}:{:02d}:{:02d}'.format(*remaining_hms))[:4]
cr.move_to(164 - width / 2 - x_bearing, 100 - height / 2 - y_bearing)
cr.show_text('-{:02d}:{:02d}:{:02d}'.format(*remaining_hms))
cr.stroke()
cr.set_font_size(30)
cr.set_source_rgb(0.00, 0.00, 0.00)
x_bearing, y_bearing, width, height = cr.text_extents(title)[:4]
cr.move_to(164 - width / 2 - x_bearing, 165 - height / 2 - y_bearing)
cr.show_text(title)
cr.stroke()
cr.set_font_size(25)
x_bearing, y_bearing, width, height = cr.text_extents(artist)[:4]
cr.move_to(164 - width / 2 - x_bearing, 205 - height / 2 - y_bearing)
cr.show_text(artist)
cr.stroke()
im = Image.frombuffer('RGBA', (surface.get_width(), surface.get_height()),
surface.get_data(), 'raw', 'BGRA', 0, 1)
self.showImage(im)
return True
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Adding fader start and the' +
'ability to drive a display to mpris2 compatible players.')
#parser.add_argument('--player', type=str, default='rhythmbox',
# help='the bus name of the player')
parser.add_argument('--sport', type=str, default='/dev/ttyUSB0',
help='the serial port')
args = parser.parse_args()
while True:
try:
GObject.threads_init()
w = DbusOwnerChangeWatch(args.sport)
GObject.MainLoop().run()
except:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment