Created
August 4, 2016 05:44
-
-
Save francoismartin/289bcc6585ed7af3b0c3e5f19c668620 to your computer and use it in GitHub Desktop.
MPD Controller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import re | |
import logging | |
import difflib | |
import mpd | |
import logging | |
parser = argparse.ArgumentParser() | |
logging.basicConfig() | |
# Ajoute les arguments de controle | |
parser.add_argument("-py", "--play", action="store_true", help="Joue la nouvelle musique designee") | |
parser.add_argument("-ps", "--pause", action="store_true", help="Met en pause la musique") | |
parser.add_argument("-st", "--stop", action="store_true", help="arrete la musique") | |
parser.add_argument("-nx", "--next", action="store_true", help="musique suivante") | |
parser.add_argument("-pv", "--previous", action="store_true", help="musique precedente") | |
parser.add_argument("-hg", "--higher", action="store_true", help="augmente le volume") | |
parser.add_argument("-lw", "--lower", action="store_true", help="baisse le volume") | |
parser.add_argument("-pl", "--playlist", help="Selectionne et joue une playlist defini") | |
parser.add_argument("-sg", "--song", help="Selectionne et joue un artiste / musique defini") | |
args = parser.parse_args() | |
# answer = args.square**2 | |
def reconnect(func, *default_args, **default_kwargs): | |
""" | |
Reconnects before running | |
""" | |
def wrap(self, *default_args, **default_kwargs): | |
try: | |
self.client.connect(self.server, self.port) | |
except: | |
pass | |
# sometimes not enough to just connect | |
try: | |
return func(self, *default_args, **default_kwargs) | |
except: | |
self.client = mpd.MPDClient() | |
self.client.timeout = None | |
self.client.idletimeout = None | |
self.client.connect(self.server, self.port) | |
return func(self, *default_args, **default_kwargs) | |
return wrap | |
class Song(object): | |
def __init__(self, id, title, artist, album): | |
self.id = id | |
self.title = title | |
self.artist = artist | |
self.album = album | |
class MPDWrapper(object): | |
def __init__(self, server="localhost", port=6600): | |
""" | |
Prepare the client and music variables | |
""" | |
self.server = server | |
self.port = port | |
# prepare client | |
self.client = mpd.MPDClient() | |
self.client.timeout = None | |
self.client.idletimeout = None | |
self.client.connect(self.server, self.port) | |
# gather playlists | |
self.playlists = [x["playlist"] for x in self.client.listplaylists()] | |
# gather songs | |
self.client.clear() | |
for playlist in self.playlists: | |
self.client.load(playlist) | |
self.songs = [] # may have duplicates | |
# capitalized strings | |
self.song_titles = [] | |
self.song_artists = [] | |
soup = self.client.playlist() | |
for i in range(0, len(soup) / 10): | |
index = i * 10 | |
id = soup[index].strip() | |
title = soup[index + 3].strip().upper() | |
artist = soup[index + 2].strip().upper() | |
album = soup[index + 4].strip().upper() | |
self.songs.append(Song(id, title, artist, album)) | |
self.song_titles.append(title) | |
self.song_artists.append(artist) | |
@reconnect | |
def play(self, songs=False, playlist_name=False): | |
""" | |
Plays the current song or accepts a song to play. | |
Arguments: | |
songs -- a list of song objects | |
playlist_name -- user-defined, something like "Love Song Playlist" | |
""" | |
if songs: | |
self.client.clear() | |
for song in songs: | |
try: # for some reason, certain ids don't work | |
self.client.add(song.id) | |
except: | |
pass | |
if playlist_name: | |
self.client.clear() | |
self.client.load(playlist_name) | |
self.client.play() | |
@reconnect | |
def current_song(self): | |
print(self.client.status()) | |
item = self.client.playlistinfo(int(self.client.status()["song"]))[0] | |
result = "%s by %s" % (item["title"], item["artist"]) | |
return result | |
@reconnect | |
def volume(self, level=None, interval=None): | |
if level: | |
self.client.setvol(int(level)) | |
return | |
if interval: | |
level = int(self.client.status()['volume']) + int(interval) | |
self.client.setvol(int(level)) | |
return | |
@reconnect | |
def pause(self): | |
self.client.pause() | |
@reconnect | |
def stop(self): | |
self.client.stop() | |
@reconnect | |
def next(self): | |
self.client.next() | |
return | |
@reconnect | |
def previous(self): | |
self.client.previous() | |
return | |
def get_soup(self): | |
""" | |
Returns the list of unique words that comprise song and artist titles | |
""" | |
soup = [] | |
for song in self.songs: | |
song_words = song.title.split(" ") | |
artist_words = song.artist.split(" ") | |
soup.extend(song_words) | |
soup.extend(artist_words) | |
title_trans = ''.join(chr(c) if chr(c).isupper() or chr(c).islower() | |
else '_' for c in range(256)) | |
soup = [x.decode('utf-8').encode("ascii", "ignore").upper().translate( | |
title_trans).replace("_", "") for x in soup] | |
soup = [x for x in soup if x != ""] | |
return list(set(soup)) | |
def get_soup_playlist(self): | |
""" | |
Returns the list of unique words that comprise playlist names | |
""" | |
soup = [] | |
for name in self.playlists: | |
soup.extend(name.split(" ")) | |
title_trans = ''.join(chr(c) if chr(c).isupper() or chr(c).islower() | |
else '_' for c in range(256)) | |
soup = [x.decode('utf-8').encode("ascii", "ignore").upper().translate( | |
title_trans).replace("_", "") for x in soup] | |
soup = [x for x in soup if x != ""] | |
return list(set(soup)) | |
def get_soup_separated(self): | |
""" | |
Returns the list of PHRASES that comprise song and artist titles | |
""" | |
title_soup = [song.title for song in self.songs] | |
artist_soup = [song.artist for song in self.songs] | |
soup = list(set(title_soup + artist_soup)) | |
title_trans = ''.join(chr(c) if chr(c).isupper() or chr(c).islower() | |
else '_' for c in range(256)) | |
soup = [x.decode('utf-8').encode("ascii", "ignore").upper().translate( | |
title_trans).replace("_", " ") for x in soup] | |
soup = [re.sub(' +', ' ', x) for x in soup if x != ""] | |
return soup | |
def fuzzy_songs(self, query): | |
""" | |
Returns songs matching a query best as possible on either artist | |
field, etc | |
""" | |
query = query.upper() | |
matched_song_titles = difflib.get_close_matches(query, | |
self.song_titles) | |
matched_song_artists = difflib.get_close_matches(query, | |
self.song_artists) | |
# if query is beautifully matched, then forget about everything else | |
strict_priority_title = [x for x in matched_song_titles if x == query] | |
strict_priority_artists = [ | |
x for x in matched_song_artists if x == query] | |
if strict_priority_title: | |
matched_song_titles = strict_priority_title | |
if strict_priority_artists: | |
matched_song_artists = strict_priority_artists | |
matched_songs_bytitle = [ | |
song for song in self.songs if song.title in matched_song_titles] | |
matched_songs_byartist = [ | |
song for song in self.songs if song.artist in matched_song_artists] | |
matches = list(set(matched_songs_bytitle + matched_songs_byartist)) | |
return matches | |
def fuzzy_playlists(self, query): | |
""" | |
returns playlist names that match query best as possible | |
""" | |
query = query.upper() | |
lookup = {n.upper(): n for n in self.playlists} | |
results = [lookup[r] for r in difflib.get_close_matches(query, lookup)] | |
return results | |
def start(args): | |
""" | |
Responds to user-input, typically speech text, by telling a joke. | |
Arguments: | |
text -- user-input, typically transcribed speech | |
mic -- used to interact with the user (for both input and output) | |
profile -- contains information related to the user (e.g., phone | |
number) | |
""" | |
# logger = logging.getLogger(__name__) | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
kwargs = {} | |
# if 'mpdclient' in profile: | |
# if 'server' in profile['mpdclient']: | |
# kwargs['server'] = profile['mpdclient']['server'] | |
# if 'port' in profile['mpdclient']: | |
# kwargs['port'] = int(profile['mpdclient']['port']) | |
kwargs['server'] = "localhost" | |
kwargs['port'] = 6600 | |
logger.debug("Preparing to start music module") | |
try: | |
mpdwrapper = MPDWrapper(**kwargs) | |
except: | |
logger.error("Couldn't connect to MPD server", exc_info=True) | |
# mic.say("I'm sorry. It seems that Spotify is not enabled. Please " + | |
# "read the documentation to learn how to configure Spotify.") | |
print("I'm sorry. It seems that Spotify is not enabled. Please " + | |
"read the documentation to learn how to configure Spotify.") | |
return | |
# mic.say("Please give me a moment, I'm loading your Spotify playlists.") | |
print("Please give me a moment, I'm loading your Spotify playlists.") | |
logger.debug("Starting music mode") | |
delegateArgs(mpdwrapper, args) | |
logger.debug("Exiting music mode") | |
return | |
def delegateArgs(mpdwrapper, args): | |
# check if input is meant to start the music module | |
# if "PLAYLIST" in command: | |
# command = command.replace("PLAYLIST", "") | |
if args.stop: | |
# self.mic.say("Stopping music") | |
print("Stopping music") | |
mpdwrapper.stop() | |
return | |
elif args.play: | |
# self.mic.say("Playing %s" % mpdwrapper.current_song()) | |
mpdwrapper.play() | |
print("Playing %s" % mpdwrapper.current_song()) | |
return | |
elif args.pause: | |
# self.mic.say("Pausing music") | |
print("Pausing music") | |
# not pause because would need a way to keep track of pause/play | |
# state | |
mpdwrapper.stop() | |
return | |
elif args.higher: | |
#self.mic.say("Louder") | |
print ("Louder") | |
mpdwrapper.volume(interval=10) | |
mpdwrapper.play() | |
return | |
elif args.lower: | |
# self.mic.say("Softer") | |
print ("Softer") | |
mpdwrapper.volume(interval=-10) | |
mpdwrapper.play() | |
return | |
elif args.next: | |
# self.mic.say("Next song") | |
print ("Next song") | |
mpdwrapper.play() # backwards necessary to get mopidy to work | |
mpdwrapper.next() | |
# self.mic.say("Playing %s" % mpdwrapper.current_song()) | |
print ("Playing %s" % mpdwrapper.current_song()) | |
return | |
elif args.previous: | |
# self.mic.say("Previous song") | |
mpdwrapper.play() # backwards necessary to get mopidy to work | |
mpdwrapper.previous() | |
# self.mic.say("Playing %s" % mpdwrapper.current_song()) | |
print ("Playing %s" % mpdwrapper.current_song()) | |
return | |
# SONG SELECTION... requires long-loading dictionary and language model | |
elif args.song: | |
songs = mpdwrapper.fuzzy_songs(query = args.song) | |
print(songs) | |
if songs: | |
#self.mic.say("Found songs") | |
print("Found songs") | |
mpdwrapper.play(songs = songs) | |
print("SONG RESULTS") | |
print("============") | |
for song in songs: | |
print("Song: %s Artist: %s" % (song.title, song.artist)) | |
#self.mic.say("Playing %s" % mpdwrapper.current_song()) | |
print("Playing %s" % mpdwrapper.current_song()) | |
else: | |
# self.mic.say("No songs found. Resuming current song.") | |
print("No songs found. Resuming current song.") | |
mpdwrapper.play() | |
# PLAYLIST SELECTION | |
elif args.playlist: | |
playlists = mpdwrapper.fuzzy_playlists(args.playlist) | |
print(playlists) | |
if playlists: | |
# self.mic.say("Loading playlist %s" % playlists[0]) | |
print("Loading playlist %s" % playlists[0]) | |
mpdwrapper.play(playlist_name=playlists[0]) | |
# self.mic.say("Playing %s" % mpdwrapper.current_song()) | |
print("Playing %s" % mpdwrapper.current_song()) | |
else: | |
# self.mic.say("No playlists found. Resuming current song.") | |
print("No playlists found. Resuming current song.") | |
mpdwrapper.play() | |
print(mpdwrapper.current_song()) | |
return | |
# Initialise l'appel | |
start(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment