Skip to content

Instantly share code, notes, and snippets.

@alyoshenka
Last active February 11, 2020 10:05
Show Gist options
  • Save alyoshenka/5b5402b5fbd970b2c06265a16bcbedec to your computer and use it in GitHub Desktop.
Save alyoshenka/5b5402b5fbd970b2c06265a16bcbedec to your computer and use it in GitHub Desktop.
music player
# runs string commands from functions
from enum import Enum
from musicQueue import *
from directoryManager import *
# commands
# shuffle {True/False}
# play/pause
# add {song} + by {artist} + on {album}
# skip (one song)
# song/current
# ToDo
# elapsed -> song time
# remaining -> song time
# remove
# remove by artist, album (multiple arguments??)
# reset/clear
# SO SO much refactoring because most of this code is shit
# and can be designed much better
# also actually learn python don't just hack it
class AudioType(Enum):
Playlist = 0
Artist = 1
Album = 2
Song = 3
# splits input by space
def parse_input(inp):
return inp.split(' ')
# 0 = success
# -1 = fail, exit
# 1 = fail, send to other interpreter
# maps array of string arguments to a command
def map_command(args):
size = len(args)
if size == 0:
print("No arguments given")
return -1
arg0 = args[0]
if size == 1:
if arg0 == "queue" or arg0 == "show":
show_queue(fullName=True)
elif arg0 == "stop" or arg0 == "exit":
return 1 # this is terrible design
elif arg0 == "shuffle":
set_shuffle(True)
else:
print("CommandMapper unknown argument: " + arg0)
return 1
return 0
arg1 = args[1]
if size == 2:
if arg0 == "shuffle":
if arg1 == "on":
set_shuffle(True)
elif arg1 == "off":
set_shuffle(False)
else:
print("unknown argument: " + arg1)
elif arg0 == "add": # check for everything
ret = search(arg1)
if ret == None:
print("found no instance of " + arg1)
else:
add(ret[1], ret[0])
elif arg0 == "rem" or arg0 == "remove":
remove_files(arg1)
elif arg0 == "vol":
return 1
else:
print("unknown argument: " + arg0)
return -1
return 0
arg2 = args[2]
if size == 3:
print("invalid argument: " + arg2)
return -1
arg3 = args[3]
if size == 4:
if arg2 == 'on':
ret = search(arg1, audioType=AudioType.Song, _album=arg3)
add(ret[1], ret[0])
elif arg2 == 'by':
ret = search(arg1, _artist=arg3)
add(ret[1], ret[0])
else:
print("unknown argument: " + arg2)
return -1
return 0
arg4 = args[4]
if size == 5:
print("invalid argument: " + arg4)
return -1
if size == 6:
artist = args[args.index('by') + 1]
album = args[args.index('on') + 1]
if artist == None or album == None:
print("invalid arguments")
return -1
else:
ret = search(arg1, audioType=AudioType.Song, _album=arg3)
add(ret[1], ret[0])
return 0
if size > 6:
print("too many arguments")
return -1
print("this should not print")
return -1
# search for a target string
def search(target, audioType=None, _playlist=None, _artist=None, _album=None):
"""
if audioType != None:
print(audioType)
if _playlist != None:
print(_playlist)
if _artist != None:
print(_artist)
if _album != None:
print(_album)
"""
searchDir = music_dir()
# set type based on arguments
if _playlist != None:
audioType = AudioType.Playlist
# make lowercase
target = target.lower()
if audioType == AudioType.Playlist:
for playlist in dir_content(musicDir + "_Playlists/"):
if target in playlist.lower():
return [ AudioType.Playlist, musicDir + "_Playlists/" + playlist ]
elif audioType == AudioType.Artist:
for artist in dir_content(musicDir):
if target in artist.lower():
return [ AudioType.Artist, musicDir + artist + "/" ]
elif audioType == AudioType.Album:
for artist in dir_content(musicDir):
for album in dir_content(musicDir + artist + "/"):
if target in album.lower():
return [ AudioType.Album, musicDir + artist + "/" + album + "/" ]
elif audioType == AudioType.Song:
for artist in dir_content(musicDir):
for album in dir_content(musicDir + "/" + artist + "/"):
for song in dir_content(musicDir + artist + "/" + album + "/"):
if target in song.lower():
return [ AudioType.Song, musicDir + artist + "/" + album + "/" + song ]
elif audioType == None: # check everything
audioType = AudioType.Playlist
ret = search(target, audioType)
if ret == None:
audioType = AudioType.Artist
ret = search(target, audioType)
if ret == None:
audioType = AudioType.Album
ret = search(target, audioType)
if ret == None:
audioType = AudioType.Song
ret = search(target, audioType)
return ret
else:
raise Exception("invalid audiotype: " + str(audioType))
# print(str(audioType) + " '" + target + "' not found")
return
from omxplayer import OMXPlayer
# show data about the given player
def show_player_data(player):
print()
print("---")
print("Name: " + str(player.get_filename()))
print("Playing: " + str(player.is_playing()))
print("Duration: " + to_minutes(player.duration()))
print("Volume: " + str(player.volume()))
print("---")
print()
return
# show simple data about the player
def show_player_data_simple(player):
print("\n---\nPlaying: " + filename_to_song(str(player.get_filename())) + "\n---\n")
return
# convert full filename into song name
def filename_to_song(fl):
return str(fl).split("/")[-1]
# convert seconds into minutes
def to_minutes(sec):
return str(int(sec / 60)) + "m, " + str(round(sec % 60)) + "s"
# manages pulling files from correct directories
import os
from musicQueue import *
import commandMapper
musicDir = "/home/jay/Music/"
def music_dir():
return musicDir
# gets the contents of the current directory
def dir_content(loc):
ret = os.listdir(loc)
ret.sort()
return ret
def add(loc, audioType):
if loc == None or audioType == None:
print("invalid add arguments")
elif audioType == commandMapper.AudioType.Playlist:
add_playlist(loc)
elif audioType == commandMapper.AudioType.Artist:
add_artist(loc)
elif audioType == commandMapper.AudioType.Album:
add_album(loc)
elif audioType == commandMapper.AudioType.Song:
add_song(loc)
else:
raise Exception("invalid audioType")
return
def add_next(path, audioType):
assert False
return
# add song, absolute directory
def add_song(loc):
add_file(loc)
return
# add album, absolute directory
def add_album(loc):
for song in dir_content(loc):
add_file(loc + song)
return
# add artist, alsolute directory
def add_artist(loc):
for album in dir_content(loc):
add_album(loc + album + "/")
return
# add playlist, absolute directory
def add_playlist(loc):
assert False
return
# main execution loop
# external
from time import sleep
from songPlayer import SongPlayer
# internal
from musicQueue import *
from myExceptions import *
from debugMessages import *
from commandMapper import *
# start program
def start():
print("MusicPi started")
# sound?
# check for audio devive?
# set_shuffle(False)
# add test files
for i in range(2):
add_file("/home/jay/MusicPi/example.mp3")
add_artist("/home/jay/Music/TheXX/")
# initialize player
return
# stop program
def stop():
print("MusicPi stopped")
return
# main main for input
def main():
"""
arg1 = "add deadsy"
arg2 = "add xx by thexx"
arg3 = "add holiday"
arg4 = "add blind by korn on korn"
# arg5 = "add 'american idiot'"
map_command(parsearg0ut(arg1))
print()
map_command(parsearg0ut(arg2))
print()
map_command(parsearg0ut(arg3))
print()
map_command(parsearg0ut(arg4))
return
"""
start()
songThread = SongPlayer(None)
while True:
inp = input()
if map_command(parse_input(inp)) == 1:
songThread.acceptarg0ut(parse_input(inp)) # this is terrible
stop()
exit()
if __name__ == "__main__":
main()
from myExceptions import *
from debugMessages import *
from random import randint
queue = []
shuffle = False
# add file to queue
def add_file(fl):
if validate_file(fl):
queue.append(fl)
print("Added " + str(fl) + " to spot " + str(len(queue)))
else:
print("Invalid file: " + fl)
return
# remove matching files from queue
def remove_files(fl):
fl = fl.lower()
max = get_length_items()
i = 0
while i < max:
val = queue[i]
if fl in val.lower():
del queue[i]
max -= 1
print(filename_to_song(val) + " removed")
else:
i += 1
return
# add file to next position
def add_file_next(fl):
queue.insert(fl, 0)
print("Added " + str(fl) + " to next spot in queue")
# add all contents of folder to queue
def add_folder(dirLoc):
assert False, "Not Yet Implemented"
return
# add all contents of folder to beginning of queue
def add_folder_next(dirLoc):
assert False, "Not Yet Implemented"
return
# show the next song in the queue
def show_next():
if get_length_items() == 0:
raise NoMoreSongsError()
else:
print(queue[0])
return
# pull the next song out of the queue
def pull_next():
if get_length_items() == 0:
raise NoMoreSongsError()
else:
idx = next_index()
ret = queue[idx]
queue.pop(idx)
return ret
# gets the index of the next song
def next_index():
if shuffle:
idx = randint(0, get_length_items() - 1)
print(str(idx) + " " + str(get_length_items()))
return idx
else:
return 0
# prints the queue
def show_queue(fullName=False):
print("Length: " + str(len(queue)))
for i in range(get_length_items()):
disp = str(i+1) + ": "
if fullName:
disp += queue[i]
else:
disp += filename_to_song(queue[i])
print(disp)
return
# gets the length of the queue, in songs
def get_length_items():
return len(queue)
# gets the length of the queue, in time
def get_length_time():
assert False, "Not Yet Implemented"
return
# toggle shuffle
def set_shuffle(shouldShuffle):
global shuffle
shuffle = shouldShuffle
print("Shuffle: " + str(shuffle))
return
# check for valid file extension
def validate_file(fl):
return fl[-4:] == ".mp3" # expand later to accept more filetypes(?)
def is_empty():
return get_length_items() == 0
class NoMoreSongsError(Exception):
"""Raised when no more songs in queue"""
pass
#!/bin/bash
# download dependencies
sudo apt-get install libdbus-1-dev
pip3 install omxplayer-wrapper
# download example sound
#wget http://rpf.io/lamp3 -O example.mp3 --no-check-certificate
# allow video access
sudo usermod -G video jay
from musicQueue import *
from myExceptions import *
from debugMessages import *
from commandMapper import *
from omxplayer.player import *
from pathlib import Path
from threading import Thread
from time import sleep
# definitions
outputArgs = ['-o', 'local']
loopSleepTime = 1 # second
volIncr = 0.1
volLim = [0, 1]
class SongPlayer(object):
def __init__(self, instance):
# init instance
self.instance = self.init_instance()
# self.instance.exitEvent += self.play_next()
thread = Thread(target=self.play_main)
thread.daemon = True
thread.start()
def __del__(self, instance):
self.stop()
from main import stop
stop()
def stop(self):
if None == self.instance:
print("null instance on del")
else:
self.instance.stop()
# play the next song, return new instance
def play_next(self):
print("playing next")
try:
if None == self.instance:
print("null instance")
self.instance = self.init_instance()
else:
try:
self.instance.stop()
self.instance.load(Path(pull_next()))
except Exception as e:
self.instance = self.init_instance()
self.instance.play()
assert self.instance.is_playing()
show_player_data(self.instance)
except NoMoreSongsError:
print("No more songs in queue")
self.instance = None
except Exception as e:
print(e)
return
# init instance
def init_instance(self):
instance = None
try:
path = Path(pull_next())
instance = OMXPlayer(path, args=outputArgs)
instance.play()
show_player_data(instance)
except NoMoreSongsError:
print("No more songs in queue")
except Exception as e:
print(e)
return instance
# accept command input
def acceptarg0ut(self, args):
size = len(args)
if size == 0:
return -1
arg0 = args[0]
if size == 1:
if arg0 == "stop" or arg0 == "exit":
self.stop()
exit()
elif arg0 == "skip":
self.play_next()
self.instance.play()
elif arg0 == "current" or arg0 == "song":
print(filename_to_song(self.instance.get_filename()))
elif arg0 == "play":
self.instance.play()
print("Play")
elif arg0 == "pause":
self.instance.pause()
print("Pause")
elif arg0 == "mute":
self.instance.mute()
elif arg0 == "unmute":
self.instance.unmute()
elif arg0 == "dur" or arg0 == "duration":
print(to_minutes(self.instance.duration()))
else:
print("unknown SongPlayer command: " + arg0)
return
arg1 = args[1]
if size == 2 and arg1 != None:
if arg0 == "vol":
self. volume_handler(arg1)
else:
print("unknown SongPlayer command: " + arg1)
return
if size > 2:
print("too many SongPlayer arguments")
return
def volume_handler(self, arg):
if (arg == "+" or arg == "up") and self.instance.volume() < volLim[1]:
self.instance.set_volume(self.instance.volume() + volIncr)
elif arg == "-" or arg == "dwn" or arg == "down":
self.instance.set_volume(self.instance.volume() - volIncr)
elif arg.isnumeric():
if arg < volIncr[1]:
self.instance.set_volume(arg)
return
# background main for separate thread
def play_main(self):
while True:
try:
# self.acceptarg0ut()
self.play_loop()
# except org.freedesktop.DBus.Error as e: # better error handling
# print(e)
except Exception as e: # better error handling
print(e)
sleep(loopSleepTime)
return
# main execution loop
def play_loop(self):
try:
if None == self.instance:
if get_length_items() == 0:
print("waiting for more songs")
else:
print("playing next because null instance")
self.play_next()
assert self.instance.is_playing()
elif not self.instance.is_playing():
print("not playing")
# else playing
except OMXPlayerDeadError as e:
print("OMXPlayer dead")
if not is_empty():
self.play_next()
else:
print("waiting for songs") # add counter?
except Exception as e:
print("PlayLoop")
print(e)
# check for input
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment