Skip to content

Instantly share code, notes, and snippets.

@jmoiron
Created May 25, 2017 05:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmoiron/7429343c3cbd57c0c8dc1315b69bf586 to your computer and use it in GitHub Desktop.
Save jmoiron/7429343c3cbd57c0c8dc1315b69bf586 to your computer and use it in GitHub Desktop.
old weechat plugin using python interface for mp3 announcing from a bunch of diff linux mp3 players
#!/usr/bin/env python
#
# this module uses code from Michael Hudson's xmms-py modules
# this code is available in its original form here:
# http://www.python.net/crew/mwh/hacks/xmms-py.html
# the original code had this notice on it:
#
# Released by Michael Hudson on 2000-07-01, and again on 2001-04-26
# public domain; no warranty, no restrictions
#
# most of the support for xmms, beep, and audacious comes from
# various pieces of Hudson's modules
#
# licensed under the GNU GPL v2
# a copy of this license can be found:
# http://www.gnu.org/copyleft/gpl.html
#
#
# 11/26/07 - Fixed a bug w/ using juk/amarok w/o pydcop
# 11/28/07 - Started weechat port
# 07/13/08 - Changed Audacious support to Dbus, added BMPx (v 0.6)
class WeeChat(object):
"""Fake 'weechat' object. Can proxy in for printing, etc."""
def __nonzero__(self):
return False
def prnt(self, string):
string = string.replace('\00302', '')
string = string.replace('\00303', '')
string = string.replace('\002', '')
string = string.replace('\003', '')
print string
command=prnt
try:
import weechat
except:
weechat = WeeChat()
import sys, struct
import socket, os, pwd
from subprocess import *
pcop, pydcop, bus = False, False, False
try:
import pcop, pydcop
except: pass
try:
import dbus
bus = dbus.SessionBus()
except:
dbus = False
__module_name__ = "pymp3"
__module_version__ = "0.6"
__module_description__ = "mp3 announce/utils"
__debugging__ = False
if __debugging__:
import traceback
def print_debug(string):
global __debugging__
if __debugging__:
string = str(string)
weechat.prnt("\00302" + string + "\003")
def print_info(string):
string = str(string)
weechat.prnt("\00303" + string + "\003")
# XXX: beep was superceded by BMPx, BMPx is in the process of being replaced
# by MPX (from the same developers). Hopefully, MPRIS will at least bring
# some stability to the IPC/RPC interface :)
players = {
'audacious' : 'audacious',
'bmpx' : 'beep-media-player-2',
'beep' : 'beep-media-player',
# 'xmms2' : 'xmms2d',
'xmms' : 'xmms',
'banshee' : 'banshee.exe',
'banshee1' : 'banshee-1',
'juk' : 'juk',
'amarok' : 'amarokapp',
'rhythmbox' : 'rhythmbox',
}
_player_order = ['audacious', 'bmpx', 'beep', 'xmms',
'banshee', 'juk', 'amarok', 'rhythmbox']
# find out which player is running
def which():
ps = Popen(['ps', 'aux'], stdout=PIPE)
output = ps.stdout.readlines()
for line in output:
for player in _player_order:
findstr = players[player]
if line.rfind(findstr) > -1:
return player
return
#FIXME: This code isn't that great; it should probably not rely on 'split' since
# quoted won't work properly. Think of a way to fix this (maybe resort to shell=True)
def command(runstr):
return Popen(runstr.split(), stdout=PIPE).communicate()[0]
# these players use xmms style command socket
SOCKET_PLAYERS = ['audacious', 'beep', 'xmms']
class SocketCommand:
CMD_PLAY = 2 #
CMD_PAUSE = 3 #
CMD_STOP = 4 #
CMD_GET_PLAYLIST_POS = 7 #
# TODO: make socket_next and socket_prev use this
# instead of using next/prev repeatedly
#CMD_SET_PLAYLIST_POS = 8 #
CMD_GET_PLAYLIST_LENGTH = 9 #
CMD_GET_OUTPUT_TIME = 11 #
CMD_GET_PLAYLIST_FILE = 17 #
CMD_GET_PLAYLIST_TITLE = 18 #
CMD_GET_PLAYLIST_TIME = 19 #
CMD_GET_INFO = 20 #
CMD_EJECT = 28 #
CMD_PLAYLIST_PREV = 29 #
CMD_PLAYLIST_NEXT = 30 #
CMD_TOGGLE_REPEAT = 33 #
CMD_TOGGLE_SHUFFLE = 34 #
"""
I've tried to make the following class a facsimily of a "persistent connection",
but my attempts have led to the following error with xmms:
** WARNING **: ctrl_write_packet(): Failed to send data: Broken pipe
Even manually closing, deleting, and then re-initializing the socket did not avoid
this warning. It seems that only letting the garbage collector snag old Connection
objects makes xmms happy.
There is one aspect here missing from Hudson's original library: sending a custom
send format with the 'args' option. I wasn't using this feature in any requests,
as all of my provided formats were 'l' anyway.
"""
class XmmsConnection:
class ClientPacketHeader:
def __init__(self):
self.version,self.cmd,self.length = 0,0,0
def __repr__(self):
return "<< %s : version: %s cmd: %s length: %s >>"\
%(self.__class__.__name__,self.version,self.cmd,self.length)
def encode(self):
return struct.pack("hhl",self.version,self.cmd,self.length)
def __init__(self,session=0):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect("/tmp/xmms_%s.%d"%(pwd.getpwuid(os.geteuid())[0],session))
def read_header(self):
head = self.ClientPacketHeader()
head.version, head.cmd, head.length = struct.unpack("hhl",self.sock.recv(8))
return head
def send(self, cmd, args=''):
data = ""
if isinstance(args, int):
data = struct.pack('l', args)
packet = struct.pack("hhl", 1, cmd, len(data)) + data
self.sock.send(packet)
def get_reply(self, format=''):
header = self.read_header()
if format: reply = struct.unpack(format, self.sock.recv(header.length))
else: reply = self.sock.recv(header.length)
return reply
# general utilities ---
def human_bitrate(bps):
"""Takes bits per second and returns a string w/ appropriate units."""
units = ['bps', 'kbps', 'Mbps']
# order of magnitude
# if we get a weird number, assume kbps = kiloBYTESpersec
# if we get a number ending in '00', assume it's 1000's of bits (correct)
if str(bps).endswith("00"):
reduce_factor = 1000
else:
reduce_factor = 1024.0
oom = 0
while bps /(reduce_factor**(oom+1)) >= 1:
oom += 1
return '%0.1f %s' % (bps/reduce_factor**oom, units[oom])
def s_to_mmss(s):
"""Converts seconds to minutes:seconds: mm:ss."""
s = int(s)
sec = s % 60
min = s / 60
return '%2d:%02d' % (min, sec)
def us_to_mmss(us):
"""Converts miliseconds to minutes:seconds: mm:ss."""
us = int(us)
return s_to_mmss(us/1000)
class MediaPlayer:
"""A superclass that implements some book-keeping and some convenience functions
for media player objects. These objects print out as an announce string, and
get (and cache) info from the player in a clean, consistent "getitem" API."""
def __init__(self, name):
self.name = name
def _nyi(self, name):
print_debug("%s not yet implemented for `%s`." % name, self.name)
def _empty_dict(self):
"""Return an empty info dictionary with all of the keys set."""
keys = ['player', 'playlist_position', 'playlist_length', 'file',
'display_title', 'elapsed', 'length', 'bitrate', 'frequency',
'title', 'artist', 'album', 'track']
d = {}
for key in keys: d[key] = ''
d['player'] = self.name
return d
def play(self): self._nyi('Play')
def stop(self): self._nyi('Stop')
def pause(self): self._nyi('Pause')
def next(self): self._nyi('Next')
def prev(self): self._nyi('Prev')
def eject(self): self._nyi('eject')
def open(self): self._nyi('open')
def shuffle(self): self._nyi('shuffle')
def repeat(self): self._nyi('repeat')
def next_n(self, n):
"""Skip "forward" `n` songs. Should obey the player's current
repeat settings, and preferably shuffle. Overwrite if player has
Playlist position setting."""
for i in range(n):
self.next()
def prev_n(self, n):
"""Go "backward" `n` songs. Should obey the player's current
repeat settings, and preferably shuffle. Overwrite if player has
Playlist position setting."""
for i in range(n):
self.prev()
def get_info(self): return self._empty_dict()
def __str__(self):
# FIXME: this should basically be done away with.
"""FIXME: This implements the old announce strings. It's probably easier
to move this to the subclasses, but for now this is fine."""
info = self.get_info()
if self.name in SOCKET_PLAYERS:
return '%s ~ [%s] of [%s] ~ %s ~ %sHz' % (info['display_title'], \
info['elapsed'], info['length'], info['bitrate'], info['frequency'])
elif self.name in ['juk', 'amarok']:
return '%s - [%s] - %s ~ [%s] of [%s] ~ %s' % (info['artist'], \
info['album'], info['title'], info['elapsed'], info['length'], \
info['bitrate'])
elif self.name in ['banshee', 'rhythmbox']:
return '%s - [%s] - %s ~ [%s] of [%s]' % (info['artist'], info['album'], \
info['title'], info['elapsed'], info['length'])
def __repr__(self):
return '<MediaPlayer %s ...>' % (self.name)
class Xmms(MediaPlayer):
def __init__(self, name='xmms'):
MediaPlayer.__init__(self, name)
self._ifcache = {}
def _makeConnection(self):
if self.name in ['beep', 'xmms']: return XmmsConnection()
return False
def _cmd(self, command, args='', reply_format=''):
connection = self._makeConnection()
connection.send(command, args=args)
return connection.get_reply(format=reply_format)
def play(self): self._cmd(SocketCommand.CMD_PLAY)
def stop(self): self._cmd(SocketCommand.CMD_STOP)
def pause(self): self._cmd(SocketCommand.CMD_PAUSE)
def next(self): self._cmd(SocketCommand.CMD_PLAYLIST_NEXT)
def prev(self): self._cmd(SocketCommand.CMD_PLAYLIST_PREV)
def eject(self): self._cmd(SocketCommand.CMD_EJECT)
def open(self): self._cmd(SocketCommand.CMD_EJECT)
def shuffle(self): self._cmd(SocketCommand.CMD_TOGGLE_SHUFFLE)
def repeat(self): self._cmd(SocketCommand.CMD_TOGGLE_REPEAT)
def get_info(self):
d = self._empty_dict()
d['playlist_position'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_POS, reply_format='l')[0]
position = d['playlist_position']
d['playlist_length'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_LENGTH, reply_format='l')[0]
d['file'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_FILE, args=position)[:-1]
d['display_title'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TITLE, args=position)[:-1]
info = self._cmd(SocketCommand.CMD_GET_INFO, reply_format='lll')
utime_elapsed = self._cmd(SocketCommand.CMD_GET_OUTPUT_TIME, reply_format='l')[0]
utime_length = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TIME, args=position, reply_format='l')[0]
d['elapsed'] = us_to_mmss(utime_elapsed)
d['length'] = us_to_mmss(utime_length)
d['bitrate'] = human_bitrate(info[0])
d['frequency'] = info[1]
return d
BEEP_FIRST_RUN = True
BEEP_MESSAGE = """beep-media-player has a bug with its control socket and returns
bogus information for bitrate, frequency, and number of channels. Consider the
'audacious' media player, or BMPx, as beep-media-player is no longer in
development.""".replace("\n", ' ')
class Beep(Xmms):
def __init__(self):
global BEEP_FIRST_RUN, BEEP_MESSAGE
if BEEP_FIRST_RUN:
print_info(BEEP_MESSAGE)
BEEP_FIRST_RUN = False
Xmms.__init__(self, 'beep')
BMPX_FIRST_RUN = True
BMPX_4013_WARNING = """You are running bmpx version "%s", which has known \
bugs in the dbus interface. "BMP 0.40.14" fixes some of these, but pause \
support is still known to be broken in this release."""
BMPX_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(length)s] \
~ %(kbps)s ~ %(freq)sHz"""
class Bmpx(MediaPlayer):
def __init__(self, name="bmpx"):
global BMPX_FIRST_RUN
if not bus:
return
MediaPlayer.__init__(self, name)
self.Root = bus.get_object('org.mpris.bmp', '/')
self.Player = bus.get_object('org.mpris.bmp', '/Player')
self.TrackList = bus.get_object('org.mpris.bmp', '/TrackList')
if BMPX_FIRST_RUN:
BMPX_FIRST_RUN = False
self.version = str(self.Root.Identity())
if self.version < 'BMP 0.40.14':
print BMPX_4013_WARNING % self.version
def play(self):
if self.version < 'BMP 0.40.14':
print_info("playing does not work with version \"%s\" of BMPx" % self.version)
return
self.Player.Play()
def stop(self):
if self.version < 'BMP 0.40.14':
print_info("stop disabled for this version of BMPx, since playing does not work.")
return
self.Player.Stop()
def pause(self):
if self.version < 'BMP 0.40.15':
print_info("pausing does not work with version \"%s\" of BMPx" % self.version)
return
self.Player.Pause()
def next(self): self.Player.Next()
def prev(self): self.Player.Prev()
# are these necessary? maybe they should be removed
def eject(self): pass
def open(self): pass
def get_info(self):
info = self.Player.GetMetadata()
decode = lambda x: unicode(x).encode('utf-8')
return {
'artist' : decode(info['artist']),
'album' : decode(info['album']),
'title' : decode(info['title']),
'length' : s_to_mmss(int(info['time'])),
'kbps' : human_bitrate(int(info['bitrate'])),
'freq' : decode(info['samplerate']),
}
def __str__(self):
info = self.get_info()
return BMPX_FORMAT % info
AUDACIOUS_FIRST_RUN = True
AUDACIOUS_NODBUS = """Audacious deprecated the control socket interface many \
releases ago, and as of the release included with Ubuntu 8.04, it's officially \
gone. For now, the python dbus bindings are required for Audacious usage until \
a suitable interface using 'dbus-send' can be developed."""
AUDACIOUS_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(elapsed)s] \
of [%(length)s] ~ %(kbps)s ~ %(freq)sHz"""
class Audacious(MediaPlayer):
format = AUDACIOUS_FORMAT
def __init__(self, name="audacious"):
MediaPlayer.__init__(self, name)
global AUDACIOUS_FIRST_RUN
if not bus and AUDACIOUS_FIRST_RUN:
print_info(AUDACIOUS_NODBUS)
AUDACIOUS_FIRST_RUN = False
return
AUDACIOUS_FIRST_RUN = False
self.bus = bus
# set up the mpris interfaces
self.Root = bus.get_object('org.mpris.audacious', '/')
self.Player = bus.get_object('org.mpris.audacious', '/Player')
self.TrackList = bus.get_object('org.mpris.audacious', '/TrackList')
# XXX: this interface is going away in Audacious 2.0 as per nenolod
self.Atheme = bus.get_object('org.atheme.audacious', '/org/atheme/audacious')
def play(self): self.Player.Play()
def stop(self): self.Player.Stop()
def pause(self): self.Player.Pause()
def next(self): self.Player.Next()
def prev(self): self.Player.Prev()
# are these necessary? maybe they should be removed
def eject(self): self.Atheme.Eject()
def open(self): self.Atheme.Eject()
def __str__(self):
info = self.get_info()
return self.format % info
def get_info(self):
kbps, freq, ch = map(int, self.Atheme.GetInfo())
info_dict = self.Player.GetMetadata()
return {
'kbps' : human_bitrate(kbps),
'freq' : freq,
'channels' : ch,
'artist' : unicode(info_dict['artist']).encode('utf-8'),
'album' : unicode(info_dict['album']).encode('utf-8'),
'title' : unicode(info_dict['title']).encode('utf-8'),
'elapsed' : us_to_mmss(self.Player.PositionGet()),
'length' : us_to_mmss(info_dict['length']),
}
BANSHEE_FIRST_RUN = True
BANSHEE_MESSAGE = """Although banshee is supported without them, it is recommended
that you install the python-dbus bindings for increased speed.""".replace("\n", " ")
class Banshee(MediaPlayer):
def __init__(self):
global BANSHEE_FIRST_RUN, BANSHEE_MESSAGE
if BANSHEE_FIRST_RUN and not bus:
print_info(BANSHEE_MESSAGE)
BANSHEE_FIRST_RUN = False
MediaPlayer.__init__(self, 'banshee')
self._ifcache = {}
interface = ['play', 'stop', 'pause', 'next', 'prev', 'eject', 'open', 'get_info']
if bus:
self.d_obj = bus.get_object("org.gnome.Banshee", "/org/gnome/Banshee/Player")
self.banshee = dbus.Interface(self.d_obj, "org.gnome.Banshee.Core")
for func in interface:
setattr(self, func, getattr(self, '%s_dbus' % func))
else:
for func in interface:
setattr(self, func, getattr(self, '%s_nodbus' % func))
def play_dbus(self): self.banshee.Play()
def stop_dbus(self): self.banshee.Pause()
def pause_dbus(self): self.banshee.TogglePlaying()
def next_dbus(self): self.banshee.Next()
def prev_dbus(self): self.banshee.Previous()
def eject_dbus(self): self.banshee.ShowWindow()
def open_dbus(self): self.banshee.ShowWindow()
def get_info_dbus(self):
d = self._empty_dict()
d['length'] = s_to_mmss(self.banshee.GetPlayingDuration())
d['elapsed'] = s_to_mmss(self.banshee.GetPlayingPosition())
d['artist'] = unicode(self.banshee.GetPlayingArtist()).encode('UTF-8')
d['title'] = unicode(self.banshee.GetPlayingTitle()).encode('UTF-8')
d['album'] = unicode(self.banshee.GetPlayingAlbum()).encode('UTF-8')
return d
def play_nodbus(self): command('banshee --play')
def stop_nodbus(self): command('banshee --pause')
def pause_nodbus(self): command('banshee --toggle-playing')
def next_nodbus(self): command('banshee --next')
def prev_nodbus(self): command('banshee --previous')
def eject_nodbus(self): command('banshee --show')
def open_nodbus(self): command('banshee --show')
# shuffle & repeat not yet implemented
def get_info_nodbus(self):
d = self._empty_dict()
info = command(' '.join(['banshee', '--hide-field', '--query-title',
'--query-artist', '--query-position', '--query-album',
'--query-duration'])).strip()
# duration, artist, album, title, position
# banshee reports things in seconds
info = info.split('\n')
d['length'] = s_to_mmss(info[0])
d['artist'] = info[1]
d['album'] = info[2]
d['title'] = info[3]
d['elapsed'] = s_to_mmss(info[4])
return d
class Rhythmbox(MediaPlayer):
"""MediaPlayer class for Rhythmbox, a Gtk/Gnome media player."""
def __init__(self):
if not bus:
raise Exception('Rhythmbox is not supported w/o python-dbus bindings.')
MediaPlayer.__init__(self, 'rhythmbox')
player_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Player")
shell_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Shell")
self.player = dbus.Interface(player_obj, "org.gnome.Rhythmbox.Player")
self.shell = dbus.Interface(shell_obj, "org.gnome.Rhythmbox.Shell")
def play(self):
if not bool(self.player.getPlaying()): self.player.playPause()
def stop(self):
if bool(self.player.getPlaying()): self.player.playPause()
def pause(self): self.player.playPause()
def next(self): self.player.next()
def prev(self): self.player.previous()
def eject(self): print_info("There isn't an easy way to do this in rhythmbox right now.")
def open(self): print_info("There isn't an easy way to do this in rhythmbox right now.")
def get_info(self):
d = self._empty_dict()
uri = unicode(self.player.getPlayingUri())
properties = dict([(unicode(key), val) for key,val in dict(self.shell.getSongProperties(uri)).items()])
d['length'] = s_to_mmss(int(properties.get('duration', 0)))
d['elapsed'] = s_to_mmss(int(self.player.getElapsed()))
d['artist'] = unicode(properties.get('artist', '')).encode('UTF-8')
d['album'] = unicode(properties.get('album', '')).encode('UTF-8')
d['title'] = unicode(properties.get('title', '')).encode('UTF-8')
# Banshee reports a 'bitrate', but as far as i can tell it's always 0
return d
JUK_FIRST_RUN = True
DCOP_MESSAGE = """Although juk is supported without them, it is recommended that
you install the python-dcop bindings for increased speed.""".replace("\n", ' ')
class Juk(MediaPlayer):
"""MediaPlayer class for Juk, a Qt/KDE media player. This implementation is
a bit messy because it resolves whether or not to use DCOP statically; after
importing, the comparissons are made and the appropriate functions are used."""
def __init__(self):
global JUK_FIRST_RUN, DCOP_MESSAGE, pydcop
if JUK_FIRST_RUN and not pydcop:
print_info(DCOP_MESSAGE)
JUK_FIRST_RUN = False
MediaPlayer.__init__(self, 'juk')
self._ifcache = {}
# these functions are to be selected from _%s_dcop and #s_nodcop
self._functions = ['eject', 'open']
# these functions are created below; the keys are function names, the values
# are juk PLayer dcop values
self._func_map = {'play':'play', 'stop':'stop', 'pause':'playPause', 'next':'forward', 'prev':'back'}
if pydcop:
# if we have pydcop, create 'juk' and set some functions
self.juk = pydcop.anyAppCalled("juk")
self.get_property = (lambda x: self.juk.Player.trackProperty(x))
self.get_juk = (lambda func: getattr(self.juk.Player, func)())
for func in self._functions:
setattr(self, func, getattr(self, '_%s_dcop' % func))
else:
# with no dcop, set equivalent functions to above using 'command' interface
self.get_property = (lambda x: command('dcop juk Player trackProperty %s' % (x)).strip())
self.get_juk = (lambda func: command('dcop juk Player %s' % func))
for func in self._functions:
setattr(self, func, getattr(self, '_%s_nodcop' % func))
# this forloop sets all of the keys in 'func_map' to lambdas that call
# whatever 'get_juk' was created by the conditional above
for funcname, juk_property in self._func_map.items():
setattr(self, funcname, (lambda prop=juk_property: self.get_juk(prop)))
def _eject_dcop(self):
pcop.dcop_call("juk", "juk-mainwindow#1", "restore", ())
pcop.dcop_call("juk", "juk-mainwindow#1", "raise", ())
def _open_dcop(self): self._eject_dcop()
def _eject_nodcop(self):
command('dcop juk juk-mainwindow#1 restore')
command('dcop juk juk-mainwindow#1 raise')
def _open_nodcop(self): self._eject_nodcop()
def get_info(self):
d = self._empty_dict()
elapsed = self.get_juk('currentTime')
d['elapsed'] = s_to_mmss(elapsed)
d['title'] = self.get_property('Title')
d['artist'] = self.get_property('Artist')
d['album'] = self.get_property('Album')
d['length'] = s_to_mmss(self.get_property('Seconds'))
d['bitrate'] = '%s Kbps' % self.get_property('Bitrate')
return d
AMAROK_FIRST_RUN = True
AMAROK_DCOP_MESSAGE = """Although amarok is supported without them, it is recommended that
you install the python-dcop bindings for increased speed.""".replace("\n", ' ')
class Amarok(MediaPlayer):
"""MediaPlayer class for Amarok, a Qt/KDE media player. This implementation is
a bit messy because it resolves whether or not to use DCOP statically; after
importing, the comparissons are made and the appropriate functions are used."""
def __init__(self):
global AMAROK_FIRST_RUN, AMAROK_DCOP_MESSAGE, pydcop
if AMAROK_FIRST_RUN and not pydcop:
print_info(AMAROK_DCOP_MESSAGE)
AMAROK_FIRST_RUN = False
MediaPlayer.__init__(self, 'amarok')
self._ifcache = {}
"""If the pydcop is available, then we create a 'self.get_property' function
that uses pydcop; if it isn't available, we create a function that works the same
but using our 'command' interface. Then, using the 'self.get_property', we bind
'self.play', 'self.stop', etc. to the object's namespace."""
self._functions = ['play', 'stop', 'pause']
if pydcop:
self.amarok = pydcop.anyAppCalled("amarok")
self.get_property = (lambda x: getattr(self.amarok.player, x)())
self.get_playlist = (lambda x: getattr(self.amarok.playlist, x)())
self.set_playlist = (lambda x: self.amarok.playlist.playByIndex(x))
else:
self.get_property = (lambda x: command('dcop amarok player %s' % x).strip())
self.get_playlist = (lambda x: command('dcop amarok playlist %s' % x).strip())
self.set_playlist = (lambda x: command('dcop amarok playlist playByIndex %s' % x))
for func in self._functions:
setattr(self, func, (lambda func=func: self.get_property(func)))
def open(self): print_info("There isn't an easy way to do this with amarok right now.")
def eject(self): print_info("There isn't an easy way to do this with amarok right now.")
def prev_n(self, n):
"""Go backwards 'n' times in the playlist"""
position = self.get_playlist('getActiveIndex')
new_position = position - n
if new_position < 0: new_position = 0
self.set_playlist(new_position)
def next_n(self, n):
"""Go forwards 'n' times in the playlist"""
position = self.get_playlist('getActiveIndex')
playlist_length = self.get_playlist('getTotalTrackCount')
new_position = position + n
if new_position >= playlist_length:
new_position = playlist_length - 1
self.set_playlist(new_position)
def get_info(self):
d = self._empty_dict()
# this comes back in 'm:ss'
d['elapsed'] = self.get_property('currentTime')
d['title'] = self.get_property('title')
d['artist'] = self.get_property('artist')
d['album'] = self.get_property('album')
d['length'] = self.get_property('totalTime')
d['bitrate'] = '%s Kbps' % self.get_property('bitrate')
return d
def current_player():
player = which()
print_debug("detected %s is running" % player)
if player is 'banshee1':
player = 'banshee'
if not player:
raise Exception("Currently not running a supported media player.")
player_obj = eval("%s()" % player.capitalize())
return player_obj
def help(args):
weechat.prnt("Commands:")
weechat.prnt(" \002/mp3\002 : announce the currently playing mp3")
weechat.prnt(" \002/mp3\002 \00303stop\003 : stop playing")
weechat.prnt(" \002/mp3\002 \00303play\003 : start playing")
weechat.prnt(" \002/mp3\002 \00303pause\003 : pause playback")
weechat.prnt(" \002/mp3\002 \00303next [#]\003 : skip to next (# of) track(s)")
weechat.prnt(" \002/mp3\002 \00303prev [#]\003 : skip to prev (# of) track(s)")
weechat.prnt(" \002/mp3\002 \00303open\003 : open files")
weechat.prnt("")
def usage():
weechat.prnt("Usage: \002/mp3\002 [cmd]")
weechat.prnt("\t\002/mp3\002 \037help\037 for commands.")
def announce():
weechat.command('/me is listening to: %s' % (current_player()))
def stop(*args):
current_player().stop()
def play(*args):
current_player().play()
def pause(*args):
current_player().pause()
def open(*args):
current_player().open()
def eject(*args):
current_player().eject()
def _make_num(numstr):
try: return int(numstr)
except:
print_error('"%s" must be a number.' % numstr)
return None
def next(argv):
num = None
if len(argv) == 3:
num = _make_num(argv[2])
if num is None: return
if num is None:
current_player().next()
else:
current_player().next_n(num)
def prev(argv):
num = None
if len(argv) == 3:
num = _make_num(argv[2])
if num is None: return
if num is None:
current_player().prev()
else:
current_player().prev_n(num)
#def dispatch(argv, arg_to_eol, c):
def dispatch(server, args):
args = "/mp3 " + args
args = args.strip()
args = args.split(' ')
print_debug(args)
if len(args) == 1:
try: announce()
except Exception, ex:
if __debugging__: print_debug(traceback.format_exc())
else: usage()
# eat in weechat is 0?
return 0
try:
{
"help" : help,
"stop" : stop,
"play" : play,
"pause" : pause,
"next" : next,
"prev" : prev,
"eject" : eject,
"open" : open,
}[args[1]](args)
except Exception, ex:
if __debugging__: print_debug(traceback.format_exc())
else: usage()
return 0
if weechat:
weechat.register(__module_name__, __module_version__, '', __module_description__)
weechat.add_command_handler('mp3', 'dispatch')
# are we '__main__' when we are imported by weechat? probably not
if __name__ == '__main__' and not weechat:
announce()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment