Last active
February 11, 2020 10:05
-
-
Save alyoshenka/5b5402b5fbd970b2c06265a16bcbedec to your computer and use it in GitHub Desktop.
music player
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
*.mp3 | |
*.pyc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# runs string commands from functions | |
from enum import Enum | |
from musicQueue import * | |
from directoryManager import * | |
# commands | |
# shuffle {True/False} | |
# play/pause | |
# add {song} + by {artist} + on {album} | |
# skip (one song) | |
# song/current | |
# ToDo | |
# elapsed -> song time | |
# remaining -> song time | |
# remove | |
# remove by artist, album (multiple arguments??) | |
# reset/clear | |
# SO SO much refactoring because most of this code is shit | |
# and can be designed much better | |
# also actually learn python don't just hack it | |
class AudioType(Enum): | |
Playlist = 0 | |
Artist = 1 | |
Album = 2 | |
Song = 3 | |
# splits input by space | |
def parse_input(inp): | |
return inp.split(' ') | |
# 0 = success | |
# -1 = fail, exit | |
# 1 = fail, send to other interpreter | |
# maps array of string arguments to a command | |
def map_command(args): | |
size = len(args) | |
if size == 0: | |
print("No arguments given") | |
return -1 | |
arg0 = args[0] | |
if size == 1: | |
if arg0 == "queue" or arg0 == "show": | |
show_queue(fullName=True) | |
elif arg0 == "stop" or arg0 == "exit": | |
return 1 # this is terrible design | |
elif arg0 == "shuffle": | |
set_shuffle(True) | |
else: | |
print("CommandMapper unknown argument: " + arg0) | |
return 1 | |
return 0 | |
arg1 = args[1] | |
if size == 2: | |
if arg0 == "shuffle": | |
if arg1 == "on": | |
set_shuffle(True) | |
elif arg1 == "off": | |
set_shuffle(False) | |
else: | |
print("unknown argument: " + arg1) | |
elif arg0 == "add": # check for everything | |
ret = search(arg1) | |
if ret == None: | |
print("found no instance of " + arg1) | |
else: | |
add(ret[1], ret[0]) | |
elif arg0 == "rem" or arg0 == "remove": | |
remove_files(arg1) | |
elif arg0 == "vol": | |
return 1 | |
else: | |
print("unknown argument: " + arg0) | |
return -1 | |
return 0 | |
arg2 = args[2] | |
if size == 3: | |
print("invalid argument: " + arg2) | |
return -1 | |
arg3 = args[3] | |
if size == 4: | |
if arg2 == 'on': | |
ret = search(arg1, audioType=AudioType.Song, _album=arg3) | |
add(ret[1], ret[0]) | |
elif arg2 == 'by': | |
ret = search(arg1, _artist=arg3) | |
add(ret[1], ret[0]) | |
else: | |
print("unknown argument: " + arg2) | |
return -1 | |
return 0 | |
arg4 = args[4] | |
if size == 5: | |
print("invalid argument: " + arg4) | |
return -1 | |
if size == 6: | |
artist = args[args.index('by') + 1] | |
album = args[args.index('on') + 1] | |
if artist == None or album == None: | |
print("invalid arguments") | |
return -1 | |
else: | |
ret = search(arg1, audioType=AudioType.Song, _album=arg3) | |
add(ret[1], ret[0]) | |
return 0 | |
if size > 6: | |
print("too many arguments") | |
return -1 | |
print("this should not print") | |
return -1 | |
# search for a target string | |
def search(target, audioType=None, _playlist=None, _artist=None, _album=None): | |
""" | |
if audioType != None: | |
print(audioType) | |
if _playlist != None: | |
print(_playlist) | |
if _artist != None: | |
print(_artist) | |
if _album != None: | |
print(_album) | |
""" | |
searchDir = music_dir() | |
# set type based on arguments | |
if _playlist != None: | |
audioType = AudioType.Playlist | |
# make lowercase | |
target = target.lower() | |
if audioType == AudioType.Playlist: | |
for playlist in dir_content(musicDir + "_Playlists/"): | |
if target in playlist.lower(): | |
return [ AudioType.Playlist, musicDir + "_Playlists/" + playlist ] | |
elif audioType == AudioType.Artist: | |
for artist in dir_content(musicDir): | |
if target in artist.lower(): | |
return [ AudioType.Artist, musicDir + artist + "/" ] | |
elif audioType == AudioType.Album: | |
for artist in dir_content(musicDir): | |
for album in dir_content(musicDir + artist + "/"): | |
if target in album.lower(): | |
return [ AudioType.Album, musicDir + artist + "/" + album + "/" ] | |
elif audioType == AudioType.Song: | |
for artist in dir_content(musicDir): | |
for album in dir_content(musicDir + "/" + artist + "/"): | |
for song in dir_content(musicDir + artist + "/" + album + "/"): | |
if target in song.lower(): | |
return [ AudioType.Song, musicDir + artist + "/" + album + "/" + song ] | |
elif audioType == None: # check everything | |
audioType = AudioType.Playlist | |
ret = search(target, audioType) | |
if ret == None: | |
audioType = AudioType.Artist | |
ret = search(target, audioType) | |
if ret == None: | |
audioType = AudioType.Album | |
ret = search(target, audioType) | |
if ret == None: | |
audioType = AudioType.Song | |
ret = search(target, audioType) | |
return ret | |
else: | |
raise Exception("invalid audiotype: " + str(audioType)) | |
# print(str(audioType) + " '" + target + "' not found") | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from omxplayer import OMXPlayer | |
# show data about the given player | |
def show_player_data(player): | |
print() | |
print("---") | |
print("Name: " + str(player.get_filename())) | |
print("Playing: " + str(player.is_playing())) | |
print("Duration: " + to_minutes(player.duration())) | |
print("Volume: " + str(player.volume())) | |
print("---") | |
print() | |
return | |
# show simple data about the player | |
def show_player_data_simple(player): | |
print("\n---\nPlaying: " + filename_to_song(str(player.get_filename())) + "\n---\n") | |
return | |
# convert full filename into song name | |
def filename_to_song(fl): | |
return str(fl).split("/")[-1] | |
# convert seconds into minutes | |
def to_minutes(sec): | |
return str(int(sec / 60)) + "m, " + str(round(sec % 60)) + "s" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# manages pulling files from correct directories | |
import os | |
from musicQueue import * | |
import commandMapper | |
musicDir = "/home/jay/Music/" | |
def music_dir(): | |
return musicDir | |
# gets the contents of the current directory | |
def dir_content(loc): | |
ret = os.listdir(loc) | |
ret.sort() | |
return ret | |
def add(loc, audioType): | |
if loc == None or audioType == None: | |
print("invalid add arguments") | |
elif audioType == commandMapper.AudioType.Playlist: | |
add_playlist(loc) | |
elif audioType == commandMapper.AudioType.Artist: | |
add_artist(loc) | |
elif audioType == commandMapper.AudioType.Album: | |
add_album(loc) | |
elif audioType == commandMapper.AudioType.Song: | |
add_song(loc) | |
else: | |
raise Exception("invalid audioType") | |
return | |
def add_next(path, audioType): | |
assert False | |
return | |
# add song, absolute directory | |
def add_song(loc): | |
add_file(loc) | |
return | |
# add album, absolute directory | |
def add_album(loc): | |
for song in dir_content(loc): | |
add_file(loc + song) | |
return | |
# add artist, alsolute directory | |
def add_artist(loc): | |
for album in dir_content(loc): | |
add_album(loc + album + "/") | |
return | |
# add playlist, absolute directory | |
def add_playlist(loc): | |
assert False | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# main execution loop | |
# external | |
from time import sleep | |
from songPlayer import SongPlayer | |
# internal | |
from musicQueue import * | |
from myExceptions import * | |
from debugMessages import * | |
from commandMapper import * | |
# start program | |
def start(): | |
print("MusicPi started") | |
# sound? | |
# check for audio devive? | |
# set_shuffle(False) | |
# add test files | |
for i in range(2): | |
add_file("/home/jay/MusicPi/example.mp3") | |
add_artist("/home/jay/Music/TheXX/") | |
# initialize player | |
return | |
# stop program | |
def stop(): | |
print("MusicPi stopped") | |
return | |
# main main for input | |
def main(): | |
""" | |
arg1 = "add deadsy" | |
arg2 = "add xx by thexx" | |
arg3 = "add holiday" | |
arg4 = "add blind by korn on korn" | |
# arg5 = "add 'american idiot'" | |
map_command(parsearg0ut(arg1)) | |
print() | |
map_command(parsearg0ut(arg2)) | |
print() | |
map_command(parsearg0ut(arg3)) | |
print() | |
map_command(parsearg0ut(arg4)) | |
return | |
""" | |
start() | |
songThread = SongPlayer(None) | |
while True: | |
inp = input() | |
if map_command(parse_input(inp)) == 1: | |
songThread.acceptarg0ut(parse_input(inp)) # this is terrible | |
stop() | |
exit() | |
if __name__ == "__main__": | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from myExceptions import * | |
from debugMessages import * | |
from random import randint | |
queue = [] | |
shuffle = False | |
# add file to queue | |
def add_file(fl): | |
if validate_file(fl): | |
queue.append(fl) | |
print("Added " + str(fl) + " to spot " + str(len(queue))) | |
else: | |
print("Invalid file: " + fl) | |
return | |
# remove matching files from queue | |
def remove_files(fl): | |
fl = fl.lower() | |
max = get_length_items() | |
i = 0 | |
while i < max: | |
val = queue[i] | |
if fl in val.lower(): | |
del queue[i] | |
max -= 1 | |
print(filename_to_song(val) + " removed") | |
else: | |
i += 1 | |
return | |
# add file to next position | |
def add_file_next(fl): | |
queue.insert(fl, 0) | |
print("Added " + str(fl) + " to next spot in queue") | |
# add all contents of folder to queue | |
def add_folder(dirLoc): | |
assert False, "Not Yet Implemented" | |
return | |
# add all contents of folder to beginning of queue | |
def add_folder_next(dirLoc): | |
assert False, "Not Yet Implemented" | |
return | |
# show the next song in the queue | |
def show_next(): | |
if get_length_items() == 0: | |
raise NoMoreSongsError() | |
else: | |
print(queue[0]) | |
return | |
# pull the next song out of the queue | |
def pull_next(): | |
if get_length_items() == 0: | |
raise NoMoreSongsError() | |
else: | |
idx = next_index() | |
ret = queue[idx] | |
queue.pop(idx) | |
return ret | |
# gets the index of the next song | |
def next_index(): | |
if shuffle: | |
idx = randint(0, get_length_items() - 1) | |
print(str(idx) + " " + str(get_length_items())) | |
return idx | |
else: | |
return 0 | |
# prints the queue | |
def show_queue(fullName=False): | |
print("Length: " + str(len(queue))) | |
for i in range(get_length_items()): | |
disp = str(i+1) + ": " | |
if fullName: | |
disp += queue[i] | |
else: | |
disp += filename_to_song(queue[i]) | |
print(disp) | |
return | |
# gets the length of the queue, in songs | |
def get_length_items(): | |
return len(queue) | |
# gets the length of the queue, in time | |
def get_length_time(): | |
assert False, "Not Yet Implemented" | |
return | |
# toggle shuffle | |
def set_shuffle(shouldShuffle): | |
global shuffle | |
shuffle = shouldShuffle | |
print("Shuffle: " + str(shuffle)) | |
return | |
# check for valid file extension | |
def validate_file(fl): | |
return fl[-4:] == ".mp3" # expand later to accept more filetypes(?) | |
def is_empty(): | |
return get_length_items() == 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class NoMoreSongsError(Exception): | |
"""Raised when no more songs in queue""" | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# download dependencies | |
sudo apt-get install libdbus-1-dev | |
pip3 install omxplayer-wrapper | |
# download example sound | |
#wget http://rpf.io/lamp3 -O example.mp3 --no-check-certificate | |
# allow video access | |
sudo usermod -G video jay |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from musicQueue import * | |
from myExceptions import * | |
from debugMessages import * | |
from commandMapper import * | |
from omxplayer.player import * | |
from pathlib import Path | |
from threading import Thread | |
from time import sleep | |
# definitions | |
outputArgs = ['-o', 'local'] | |
loopSleepTime = 1 # second | |
volIncr = 0.1 | |
volLim = [0, 1] | |
class SongPlayer(object): | |
def __init__(self, instance): | |
# init instance | |
self.instance = self.init_instance() | |
# self.instance.exitEvent += self.play_next() | |
thread = Thread(target=self.play_main) | |
thread.daemon = True | |
thread.start() | |
def __del__(self, instance): | |
self.stop() | |
from main import stop | |
stop() | |
def stop(self): | |
if None == self.instance: | |
print("null instance on del") | |
else: | |
self.instance.stop() | |
# play the next song, return new instance | |
def play_next(self): | |
print("playing next") | |
try: | |
if None == self.instance: | |
print("null instance") | |
self.instance = self.init_instance() | |
else: | |
try: | |
self.instance.stop() | |
self.instance.load(Path(pull_next())) | |
except Exception as e: | |
self.instance = self.init_instance() | |
self.instance.play() | |
assert self.instance.is_playing() | |
show_player_data(self.instance) | |
except NoMoreSongsError: | |
print("No more songs in queue") | |
self.instance = None | |
except Exception as e: | |
print(e) | |
return | |
# init instance | |
def init_instance(self): | |
instance = None | |
try: | |
path = Path(pull_next()) | |
instance = OMXPlayer(path, args=outputArgs) | |
instance.play() | |
show_player_data(instance) | |
except NoMoreSongsError: | |
print("No more songs in queue") | |
except Exception as e: | |
print(e) | |
return instance | |
# accept command input | |
def acceptarg0ut(self, args): | |
size = len(args) | |
if size == 0: | |
return -1 | |
arg0 = args[0] | |
if size == 1: | |
if arg0 == "stop" or arg0 == "exit": | |
self.stop() | |
exit() | |
elif arg0 == "skip": | |
self.play_next() | |
self.instance.play() | |
elif arg0 == "current" or arg0 == "song": | |
print(filename_to_song(self.instance.get_filename())) | |
elif arg0 == "play": | |
self.instance.play() | |
print("Play") | |
elif arg0 == "pause": | |
self.instance.pause() | |
print("Pause") | |
elif arg0 == "mute": | |
self.instance.mute() | |
elif arg0 == "unmute": | |
self.instance.unmute() | |
elif arg0 == "dur" or arg0 == "duration": | |
print(to_minutes(self.instance.duration())) | |
else: | |
print("unknown SongPlayer command: " + arg0) | |
return | |
arg1 = args[1] | |
if size == 2 and arg1 != None: | |
if arg0 == "vol": | |
self. volume_handler(arg1) | |
else: | |
print("unknown SongPlayer command: " + arg1) | |
return | |
if size > 2: | |
print("too many SongPlayer arguments") | |
return | |
def volume_handler(self, arg): | |
if (arg == "+" or arg == "up") and self.instance.volume() < volLim[1]: | |
self.instance.set_volume(self.instance.volume() + volIncr) | |
elif arg == "-" or arg == "dwn" or arg == "down": | |
self.instance.set_volume(self.instance.volume() - volIncr) | |
elif arg.isnumeric(): | |
if arg < volIncr[1]: | |
self.instance.set_volume(arg) | |
return | |
# background main for separate thread | |
def play_main(self): | |
while True: | |
try: | |
# self.acceptarg0ut() | |
self.play_loop() | |
# except org.freedesktop.DBus.Error as e: # better error handling | |
# print(e) | |
except Exception as e: # better error handling | |
print(e) | |
sleep(loopSleepTime) | |
return | |
# main execution loop | |
def play_loop(self): | |
try: | |
if None == self.instance: | |
if get_length_items() == 0: | |
print("waiting for more songs") | |
else: | |
print("playing next because null instance") | |
self.play_next() | |
assert self.instance.is_playing() | |
elif not self.instance.is_playing(): | |
print("not playing") | |
# else playing | |
except OMXPlayerDeadError as e: | |
print("OMXPlayer dead") | |
if not is_empty(): | |
self.play_next() | |
else: | |
print("waiting for songs") # add counter? | |
except Exception as e: | |
print("PlayLoop") | |
print(e) | |
# check for input | |
return | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment