Last active
May 1, 2020 12:34
-
-
Save ychalier/8dbb992e5a474e41cb6af0bab22c9fee to your computer and use it in GitHub Desktop.
Musicater: music library formatter and validator.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pylint: disable=E0401 | |
"""Musicater: music library formatter and validator. | |
""" | |
__version__ = "1.2.0" | |
__author__ = "Yohan Chalier" | |
__license__ = "MIT" | |
__email__ = "yohan@chalier.fr" | |
import os | |
import re | |
import glob | |
import logging | |
import argparse | |
import subprocess | |
import slugify | |
import eyed3 | |
import eyed3.mp3 | |
FEATURING_PATTERN = re.compile((r"^(.*) [\(\[]?[Ff](?:ea|EA)?[tT](?:uring)?\.?" | |
r" ?(.*?)\.?[\)\]]?(?: [\(\[](?:Prod|Perf)\..*?" | |
r"[\]\)])?( [\(\[].*?(?:[Mm][Ii][Xx]|[Vv]ersion)" | |
r"[\]\)])?$")) | |
ID3_GENRES = [ | |
"Blues", "Classic Rock", "Country", "Dance", "Disco", "Funk", "Grunge", | |
"Hip-Hop", "Jazz", "Metal", "New Age", "Oldies", "Other", "Pop", "R&B", | |
"Rap", "Reggae", "Rock", "Techno", "Industrial", "Alternative", "Ska", | |
"Death Metal", "Pranks", "Soundtrack", "Euro-Techno", "Ambient", "Trip-Hop", | |
"Vocal", "Jazz+Funk", "Fusion", "Trance", "Classical", "Instrumental", | |
"Acid", "House", "Game", "Sound Clip", "Gospel", "Noise", "AlternRock", | |
"Bass", "Soul", "Punk", "Space", "Meditative", "Instrumental Pop", | |
"Instrumental Rock", "Ethnic", "Gothic", "Darkwave", "Techno-Industrial", | |
"Electronic", "Pop-Folk", "Eurodance", "Dream", "Southern Rock", "Comedy", | |
"Cult", "Gangsta Rap", "Top 40", "Christian Rap", "Pop / Funk", "Jungle", | |
"Native American", "Cabaret", "New Wave", "Psychedelic", "Rave", | |
"Showtunes", "Trailer", "Lo-Fi", "Tribal", "Acid Punk", "Acid Jazz", | |
"Polka", "Retro", "Musical", "Rock & Roll", "Hard Rock", "Folk", | |
"Folk-Rock", "National Folk", "Swing", "Fast Fusion", "Bebob", "Latin", | |
"Revival", "Celtic", "Bluegrass", "Avantgarde", "Gothic Rock", | |
"Progressive Rock", "Psychedelic Rock", "Symphonic Rock", "Slow Rock", | |
"Big Band", "Chorus", "Easy Listening", "Acoustic", "Humour", "Speech", | |
"Chanson", "Opera", "Chamber Music", "Sonata", "Symphony", "Booty Bass", | |
"Primus", "Porn Groove", "Satire", "Slow Jam", "Club", "Tango", "Samba", | |
"Folklore", "Ballad", "Power Ballad", "Rhythmic Soul", "Freestyle", "Duet", | |
"Punk Rock", "Drum Solo", "A Cappella", "Euro-House", "Dance Hall", "Goa", | |
"Drum & Bass", "Club-House", "Hardcore", "Terror", "Indie", "BritPop", | |
"Negerpunk", "Polsk Punk", "Beat", "Christian Gangsta Rap", "Heavy Metal", | |
"Black Metal", "Crossover", "Contemporary Christian", "Christian Rock", | |
"Merengue", "Salsa", "Thrash Metal", "Anime", "JPop", "Synthpop", | |
"Abstract", "Art Rock", "Baroque", "Bhangra", "Big Beat", "Breakbeat", | |
"Chillout", "Downtempo", "Dub", "EBM", "Eclectic", "Electro", | |
"Electroclash", "Emo", "Experimental", "Garage", "Global", "IDM", | |
"Illbient", "Industro-Goth", "Jam Band", "Krautrock", "Leftfield", | |
"Lounge", "Math Rock", "New Romantic", "Nu-Breakz", "Post-Punk", | |
"Post-Rock", "Psytrance", "Shoegaze", "Space Rock", "Trop Rock", | |
"World Music", "Neoclassical", "Audiobook", "Audio Theatre", | |
"Neue Deutsche Welle", "Podcast", "Indie Rock", "G-Funk", "Dubstep", | |
"Garage Rock", "Psybient" | |
] | |
MAIN_FRAMES = { | |
"TIT2": "Title", | |
"TRCK": "Track", | |
"TPOS": "Disc", | |
"TPE1": "Artist", | |
"TPE2": "Album artist", | |
"TYER": "Year", | |
"TCON": "Genre", | |
"TALB": "Album", | |
"APIC": "Cover", | |
} | |
def validate_field(field, value): | |
"""Validate a field value""" | |
if field == "TYER": | |
return re.match(r"^\d{4}$", value) | |
if field == "TCON": | |
return value in ID3_GENRES | |
if field == "APIC": | |
return value == 3 | |
return True | |
def read_field(key, values): | |
"""Read a field's value and parse it if necessary""" | |
if key == "APIC": | |
return values[0].picture_type | |
if key in ["TRCK", "TPOS"]: | |
if "/" in values[0].text: | |
return int(values[0].text.split("/")[0]) | |
return int(values[0].text) | |
if hasattr(values[0], "text"): | |
return values[0].text.strip() | |
return "" | |
def clean_title(original_title): | |
"""Clean the title field of a track""" | |
cleaned_title = original_title | |
match = FEATURING_PATTERN.match(original_title) | |
if match is not None: | |
cleaned_title = "%s (feat. %s)" % (match.group(1), match.group(2)) | |
if match.group(3) is not None: | |
cleaned_title += match.group(3) | |
return cleaned_title.strip() | |
class RawAudioFile(eyed3.mp3.Mp3AudioFile): | |
"""Extend eyed3.mp3.Mp3AudioFile to have access to the raw frames""" | |
def load(filename): # pylint: disable=E0213 | |
"""Load a filename and cast the eyed3.mp3.Mp3AudioFile into a | |
RawAudioFile | |
""" | |
eyed3_audiofile = eyed3.load(filename) | |
if eyed3_audiofile is None: | |
return None | |
eyed3_audiofile.__class__ = RawAudioFile | |
eyed3_audiofile.frames = eyed3_audiofile.gather_frames() | |
return eyed3_audiofile | |
def get(self, field): | |
"""Frame getter""" | |
value = self.frames.get(field) | |
if value is not None and validate_field(field, value): | |
return value | |
return None | |
def gather_frames(self): | |
"""Gather the set of available frames""" | |
return { | |
key.decode("ascii"): read_field(key.decode("ascii"), values) | |
for key, values in self._tag.frame_set.items() | |
} | |
def clean_frames(self): | |
"""Remove useless frames""" | |
for key in list(self._tag.frame_set.keys()): | |
if key.decode("ascii") not in MAIN_FRAMES: | |
del self._tag.frame_set[key] | |
def converter(args): | |
"""Convert non MP3 files in a directory""" | |
logging.info("Checking for files to convert in %s", args.input_folder) | |
asked = 0 | |
for filename in glob.glob(os.path.join(args.input_folder, "*")): | |
if os.path.isdir(filename): | |
continue | |
if os.path.splitext(filename)[1] != ".mp3" or args.convert: | |
asked += 1 | |
if not args.convert: | |
choice = "foo" | |
while choice.lower() not in ["", "y", "yes", "n", "no"]: | |
choice = input("File '%s' is not an MP3. Convert it? (y/n) " | |
% filename) | |
if args.convert or "y" in choice.lower(): | |
basename = os.path.splitext(filename)[0] | |
command = ["ffmpeg", "-i", filename, | |
"-y", basename + ".tmp.mp3"] | |
process = subprocess.Popen( | |
command, | |
# stdout=subprocess.PIPE, | |
# stderr=subprocess.PIPE | |
) | |
process.wait() | |
if os.path.isfile(basename + ".tmp.mp3"): | |
os.remove(filename) | |
os.rename(basename + ".tmp.mp3", basename + ".mp3") | |
if asked == 0: | |
logging.info("No file to convert found") | |
def validator(args): # pylint: disable=R0912 | |
"""Check if the song set is valid within the folder""" | |
logging.info("Validating folder %s", args.input_folder) | |
groups = {key: set() for key in MAIN_FRAMES} | |
errors = 0 | |
warnings = 0 | |
tracks = dict() | |
for filename in glob.glob(os.path.join(args.input_folder, "*.mp3")): | |
logging.info("Checking file '%s'", filename) | |
audiofile = RawAudioFile.load(filename) | |
if audiofile is None: | |
logging.error("Wrong file '%s'", filename) | |
errors += 1 | |
continue | |
for field in MAIN_FRAMES: | |
value = audiofile.get(field) | |
if value is not None: | |
if field == "TRCK": | |
tracks.setdefault(audiofile.get("TPOS"), list()) | |
tracks[audiofile.get("TPOS")].append(value) | |
groups[field].add(value) | |
elif field in ["TIT2", "TRCK"]: | |
logging.error( | |
"Invalid or Missing %s of '%s'", | |
MAIN_FRAMES[field], os.path.basename(filename) | |
) | |
errors += 1 | |
else: | |
logging.warning( | |
"Invalid or Missing %s of '%s'", | |
MAIN_FRAMES[field], os.path.basename(filename) | |
) | |
warnings += 1 | |
for field, allow_multiple in zip( | |
["TALB", "TPE2", "TYER", "TCON", "TPE1"], | |
[False, False, args.multiple_years, args.multiple_genres, | |
args.multiple_artists] | |
): | |
if allow_multiple and len(groups[field]) > 1: | |
logging.warning( | |
"Multiple %ss: %s", | |
MAIN_FRAMES[field], sorted(groups[field]) | |
) | |
warnings += 1 | |
elif len(groups[field]) != 1: | |
logging.error( | |
"Invalid, Missing or Multiple %ss: %s", | |
MAIN_FRAMES[field], sorted(groups[field]) | |
) | |
errors += 1 | |
if len(groups["TPOS"]) == 0\ | |
or sorted(groups["TPOS"]) != list(range(1, max(groups["TPOS"]) + 1)): | |
logging.error("Invalid %ss numbering", MAIN_FRAMES["TPOS"]) | |
errors += 1 | |
for disc, track_nums in tracks.items(): | |
if sorted(track_nums) != list(range(1, max(track_nums) + 1)): | |
logging.error( | |
"Invalid %ss numbering for disc %s", | |
MAIN_FRAMES["TRCK"], disc | |
) | |
errors += 1 | |
logging.info( | |
"Validation found %d error(s) and %d warning(s)", | |
errors, warnings | |
) | |
return errors, warnings | |
class AlbumInfo: # pylint: disable=R0903 | |
"""Wrapper for album global metadata""" | |
def __init__(self, album, album_artist, years, # pylint: disable=R0913 | |
genres, tracks, discs): | |
self.album = album | |
self.album_artist = album_artist | |
self.year = max(years.items(), key=lambda x: x[1])[0] | |
self.genre = max(genres.items(), key=lambda x: x[1])[0] | |
self.tracks = tracks | |
self.discs = discs | |
def __str__(self): | |
return "\n".join([ | |
"Album:\t'%s'" % self.album, | |
"Artist:\t'%s'" % self.album_artist, | |
"Year:\t%s" % self.year, | |
"Genre:\t%s" % self.genre, | |
"Tracks:\t%s" % ", ".join( | |
"%d on disc %d" % (tracks, disc) | |
for disc, tracks in self.tracks.items() | |
) | |
]) | |
def gather_album_info(args): | |
"""Read mp3 files in a folder and return an AlbumInfo object""" | |
years = dict() | |
genres = dict() | |
tracks = dict() | |
discs = 0 | |
album = None | |
album_artist = None | |
for filename in glob.glob(os.path.join(args.input_folder, "*.mp3")): | |
audiofile = RawAudioFile.load(filename) | |
if album is None and audiofile.get("TALB") is not None: | |
album = audiofile.get("TALB") | |
if album_artist is None and audiofile.get("TPE2") is not None: | |
album_artist = audiofile.get("TPE2") | |
if audiofile.get("TYER") is not None: | |
years.setdefault(audiofile.get("TYER"), 0) | |
years[audiofile.get("TYER")] += 1 | |
if audiofile.get("TCON") is not None: | |
genres.setdefault(audiofile.get("TCON"), 0) | |
genres[audiofile.get("TCON")] += 1 | |
tracks.setdefault(audiofile.get("TPOS"), 0) | |
tracks[audiofile.get("TPOS")] += 1 | |
discs = max(discs, audiofile.get("TPOS")) | |
return AlbumInfo(album, album_artist, years, genres, tracks, discs) | |
def formatter(args): | |
"""Format songs from a folder""" | |
logging.info("Formatting folder %s", args.input_folder) | |
info = gather_album_info(args) | |
if args.assume_yes: | |
logging.info("Assuming the following album metadata:\n%s", info) | |
else: | |
if input("\nAre those metadata correct?\n\n%s\n\nyes/no> " % info)\ | |
not in ["y", "yes"]: | |
logging.warning("Cancelling operations") | |
return | |
directory = os.path.join( | |
args.input_folder, | |
slugify.slugify(info.album_artist), | |
slugify.slugify(info.album) | |
) | |
if os.path.isdir(directory): | |
logging.warning("Folder %s already exists", directory) | |
else: | |
os.makedirs(directory) | |
logging.info("Creating directory %s", directory) | |
for filename in glob.glob(os.path.join(args.input_folder, "*.mp3")): | |
audiofile = RawAudioFile.load(filename) | |
audiofile.clean_frames() | |
audiofile.tag.title = clean_title(audiofile.tag.title) | |
if audiofile.get("TPE1") is None: | |
audiofile.tag.artist = info.album_artist | |
if audiofile.get("TALB") is None: | |
audiofile.tag.album = info.album | |
if audiofile.get("TYER") is None: | |
audiofile._tag._setDate( # pylint: disable=W0212 | |
"TYER".encode("ascii"), | |
info.year | |
) | |
if audiofile.get("TPE2") is None: | |
audiofile.tag.album_artist = info.album_artist | |
if audiofile.get("TCON") is None: | |
audiofile.tag.genre = info.genre | |
audiofile.tag.track_num = ( | |
audiofile.get("TRCK"), | |
info.tracks[audiofile.get("TPOS")] | |
) | |
audiofile.tag.disc_num = (audiofile.get("TPOS"), info.discs) | |
audiofile.tag.save() | |
new_filename = "%.2d-%s.mp3" % ( | |
audiofile.tag.track_num[0], | |
slugify.slugify(audiofile.tag.title) | |
) | |
if info.discs > 1: | |
new_filename = str(audiofile.tag.disc_num[0]) + "-" + new_filename | |
os.rename(filename, os.path.join( | |
directory, | |
new_filename | |
)) | |
def main(): | |
"""Main procedure""" | |
logging.basicConfig(level=logging.INFO, format='%(levelname)s %(message)s') | |
# eyed3.log.setLevel("ERROR") | |
parser = argparse.ArgumentParser( | |
description="A music library formatter and validator", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
) | |
parser.add_argument( | |
"input_folder", | |
type=str, | |
help="input folder (containing one album)" | |
) | |
parser.add_argument( | |
"-y", "--assume-yes", | |
action="store_true", | |
help="automatic yes to prompts", | |
dest="assume_yes" | |
) | |
parser.add_argument( | |
"-c", "--convert", | |
action="store_true", | |
help="force conversion of files (for a clean encoding)" | |
) | |
parser.add_argument( | |
"-v", "--validate", | |
action="store_true", | |
help="only validate the folder" | |
) | |
parser.add_argument( | |
"-f", "--force", | |
action="store_true", | |
help="format even if there are warnings" | |
) | |
parser.add_argument( | |
"-my", "--multiple-years", | |
action="store_true", | |
help="allow for multiple years (e.g. for a compilation)", | |
dest="multiple_years" | |
) | |
parser.add_argument( | |
"-mg", "--multiple-genres", | |
action="store_true", | |
help="allow for multiple genres", | |
dest="multiple_genres" | |
) | |
parser.add_argument( | |
"-ma", "--multiple-artists", | |
action="store_true", | |
help="allow for multiple artists", | |
dest="multiple_artists" | |
) | |
args = parser.parse_args() | |
# Fix for Powershell trailing quote bug | |
# (see https://github.com/PowerShell/PowerShell/issues/7400) | |
if not os.path.isdir(args.input_folder) and args.input_folder[-1] == "\"": | |
args.input_folder = args.input_folder[:-1] | |
if not os.path.isdir(args.input_folder): | |
logging.error("Incorect folder: '%s'", args.input_folder) | |
return | |
converter(args) | |
errors, warnings = validator(args) | |
if not args.validate: | |
if errors > 0: | |
logging.error("Could not format folder because of errors.") | |
elif warnings > 0 and not args.force: | |
logging.error(" ".join([ | |
"Could not format folder because of warnings.", | |
"Use -f to force formatting." | |
])) | |
else: | |
formatter(args) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment