Created
January 8, 2015 11:47
-
-
Save oleeander/9d5b94d43d22c1b71208 to your computer and use it in GitHub Desktop.
mpris2 faderstart + dpflib display
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
import argparse | |
import time | |
from gi.repository import GObject | |
import dbus | |
from dbus.mainloop.glib import DBusGMainLoop | |
import serial | |
import dpflib_c | |
from PIL import Image | |
import cairo | |
class DbusOwnerChangeWatch(object): | |
def __init__(self, serial_port='/dev/ttyUSB0'): | |
DBusGMainLoop(set_as_default=True) | |
self._serial_port = serial_port | |
self._bus = dbus.SessionBus() | |
self._proxy = self._bus.get_object('org.freedesktop.DBus', | |
'/org/freedesktop/DBus') | |
self._proxy.connect_to_signal('NameOwnerChanged', self._owner_changed) | |
def _owner_changed(self, name, new, old): | |
if len(new) == 0 and 'org.mpris.MediaPlayer2' in name: | |
if hasattr(self, '_faderstart'): | |
self._faderstart.stop() | |
self._faderstart = Faderstart(self._bus, name, self._serial_port) | |
elif len(old) == 0 and hasattr(self, '_faderstart'): | |
if name == self._faderstart.player_bus_name: | |
self._faderstart.stop() | |
delattr(self, '_faderstart') | |
class MprisInterface(object): | |
def __init__(self, dbus_sessionbus, player_bus_name): | |
self._bus = dbus_sessionbus | |
self._player_object = self._bus.get_object(player_bus_name, '/org/mpris/MediaPlayer2') | |
self._player_iface = dbus.Interface(self._player_object, | |
dbus_interface='org.mpris.MediaPlayer2.Player') | |
self._player_props = dbus.Interface(self._player_object, | |
dbus_interface='org.freedesktop.DBus.Properties') | |
def get_status(self): | |
return self._player_props.Get('org.mpris.MediaPlayer2.Player', | |
'PlaybackStatus') | |
def get_artist(self): | |
try: | |
return self._player_props.Get('org.mpris.MediaPlayer2.Player', | |
'Metadata').get(dbus.String(u'xesam:artist'))[0].encode('utf-8') | |
except (TypeError, AttributeError): | |
return '-' | |
def get_title(self): | |
try: | |
return self._player_props.Get('org.mpris.MediaPlayer2.Player', | |
'Metadata').get(dbus.String(u'xesam:title')).encode('utf-8') | |
except (TypeError, AttributeError): | |
return '-' | |
def get_length(self): | |
try: | |
return self._player_props.Get('org.mpris.MediaPlayer2.Player', | |
'Metadata').get(dbus.String(u'mpris:length')) / 1000000 | |
except TypeError: | |
return 0 | |
def get_position(self): | |
try: | |
return self._player_props.Get('org.mpris.MediaPlayer2.Player', | |
'Position') / 1000000 | |
except TypeError: | |
return 0 | |
def play(self): | |
self._player_iface.Play() | |
def playpause(self): | |
self._player_iface.PlayPause() | |
def pause(self): | |
self._player_iface.Pause() | |
class Faderstart(object): | |
def __init__(self, dbus_sessionbus, player_bus_name, serial_port): | |
self.player_bus_name = player_bus_name | |
self._serial_iface = serial.Serial(serial_port, 9600, timeout=1) | |
self._serial_iface.flushInput() | |
self._serial_watch = GObject.io_add_watch(self._serial_iface.fileno(), GObject.IO_IN, self._serial_in) | |
self._player = MprisInterface(dbus_sessionbus, player_bus_name) | |
self._dpf = Display() | |
self._dpf.setBacklight(21) | |
self._dpf_timeout = GObject.timeout_add(500, self._dpf.update, self._player) | |
def _serial_in(self, fd, condition): | |
command = str(self._serial_iface.readline()).strip() | |
# the knowledge of the current playback status is needed to be compatible with spotify. *sigh* | |
# http://www.reactiongifs.com/wp-content/uploads/2013/04/friendly-fire.gif | |
status = self._player.get_status() | |
if command == 'START' and status in ['Paused', 'Stopped']: | |
self._player.playpause() | |
elif command == 'STOP' and status == 'Playing': | |
self._player.pause() | |
return True | |
def stop(self): | |
GObject.source_remove(self._dpf_timeout) | |
GObject.source_remove(self._serial_watch) | |
self._serial_iface.close() | |
self._dpf.setBacklight(0) | |
self._dpf.close() | |
class Display(dpflib_c.DPFPearl_c): | |
def __init__(self, device='usb0', name='Pearl DPF'): | |
dpflib_c.DPFPearl_c.__init__(self, device, name) | |
def update(self, player, *kwargs): | |
artist = player.get_artist() | |
if len(artist) >= 20: | |
artist = artist[0:20-2]+'..' | |
title = player.get_title() | |
if len(title) >= 17: | |
title = title[0:17-2]+'..' | |
length = player.get_length() | |
position = player.get_position() | |
m, s = divmod(length - position, 60) | |
h, m = divmod(m, 60) | |
remaining_hms = (int(h), int(m), int(s)) | |
m, s = divmod(length, 60) | |
h, m = divmod(m, 60) | |
length_hms = (int(h), int(m), int(s)) | |
m, s = divmod(position, 60) | |
h, m = divmod(m, 60) | |
position_hms = (int(h), int(m), int(s)) | |
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, 320, 240) | |
cr = cairo.Context(surface) | |
if length - position <= 10: | |
cr.set_source_rgb(1.0, 0.2, 0.0) | |
else: | |
cr.set_source_rgb(1.0, 1.0, 1.0) | |
cr.rectangle(0, 0, 320, 240) | |
cr.fill() | |
cr.select_font_face('Mono') | |
cr.set_font_size(30) | |
cr.set_source_rgb(0.00, 0.00, 0.00) | |
x_bearing, y_bearing, width, height = cr.text_extents( | |
'{:02d}:{:02d}:{:02d}'.format(*position_hms))[:4] | |
cr.move_to(80 - width / 2 - x_bearing, 35 - height / 2 - y_bearing) | |
cr.show_text('{:02d}:{:02d}:{:02d}'.format(*position_hms)) | |
cr.stroke() | |
x_bearing, y_bearing, width, height = cr.text_extents( | |
'{:02d}:{:02d}:{:02d}'.format(*length_hms))[:4] | |
cr.move_to(240 - width / 2 - x_bearing, 35 - height / 2 - y_bearing) | |
cr.show_text('{:02d}:{:02d}:{:02d}'.format(*length_hms)) | |
cr.stroke() | |
cr.set_font_size(50) | |
if length - position <= 10: | |
cr.set_source_rgb(1.00, 1.00, 1.00) | |
elif player.get_status() == 'Playing': | |
cr.set_source_rgb(0.00, 0.40, 0.00) | |
x_bearing, y_bearing, width, height = cr.text_extents( | |
'-{:02d}:{:02d}:{:02d}'.format(*remaining_hms))[:4] | |
cr.move_to(164 - width / 2 - x_bearing, 100 - height / 2 - y_bearing) | |
cr.show_text('-{:02d}:{:02d}:{:02d}'.format(*remaining_hms)) | |
cr.stroke() | |
cr.set_font_size(30) | |
cr.set_source_rgb(0.00, 0.00, 0.00) | |
x_bearing, y_bearing, width, height = cr.text_extents(title)[:4] | |
cr.move_to(164 - width / 2 - x_bearing, 165 - height / 2 - y_bearing) | |
cr.show_text(title) | |
cr.stroke() | |
cr.set_font_size(25) | |
x_bearing, y_bearing, width, height = cr.text_extents(artist)[:4] | |
cr.move_to(164 - width / 2 - x_bearing, 205 - height / 2 - y_bearing) | |
cr.show_text(artist) | |
cr.stroke() | |
im = Image.frombuffer('RGBA', (surface.get_width(), surface.get_height()), | |
surface.get_data(), 'raw', 'BGRA', 0, 1) | |
self.showImage(im) | |
return True | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Adding fader start and the' + | |
'ability to drive a display to mpris2 compatible players.') | |
#parser.add_argument('--player', type=str, default='rhythmbox', | |
# help='the bus name of the player') | |
parser.add_argument('--sport', type=str, default='/dev/ttyUSB0', | |
help='the serial port') | |
args = parser.parse_args() | |
while True: | |
try: | |
GObject.threads_init() | |
w = DbusOwnerChangeWatch(args.sport) | |
GObject.MainLoop().run() | |
except: | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment