Last active
August 4, 2021 03:58
-
-
Save GuilhermeHideki/baeb63919fd51b23d168b700f374d875 to your computer and use it in GitHub Desktop.
Beets plugin for artists
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
"""Update library's tags using MusicBrainz. | |
""" | |
from beets.plugins import BeetsPlugin | |
from beets import ui, util | |
from beets.autotag.mb import musicbrainzngs | |
from beets.autotag import hooks | |
from collections import defaultdict | |
import re | |
import sys | |
from enum import IntFlag, auto | |
assert sys.version_info >= (3, 6) | |
MBID_REGEX = r"(\d|\w){8}-(\d|\w){4}-(\d|\w){4}-(\d|\w){4}-(\d|\w){12}" | |
class FileChange(IntFlag): | |
PRETEND = auto() | |
MOVE = auto() | |
WRITE = auto() | |
def apply_item_changes(lib, item, flags: FileChange): | |
"""Store, move or write the item according to the arguments. | |
""" | |
ui.show_model_changes(item) | |
if FileChange.PRETEND in flags: | |
return | |
if FileChange.MOVE in flags and lib.directory in util.ancestry(item.path): | |
item.move(with_album=False) | |
if FileChange.WRITE in flags: | |
item.try_write() | |
item.store() | |
def get_credit(data, field, separator): | |
return separator.join(a['artist'][field] | |
for a in data['artist-credit'][0::2]) | |
def get_release(mb_album_id): | |
rels = ['artists', 'recordings', 'artist-credits'] | |
r = musicbrainzngs.get_release_by_id(mb_album_id, rels) | |
return r['release'] | |
def get_tracks(release, artist_separator, mbid_separator): | |
for medium in release['medium-list']: | |
for track in medium['track-list']: | |
yield { | |
'id': track['id'], | |
'disc': medium['position'], | |
'track': track['number'], | |
'mb_artistid': get_credit(track, 'id', mbid_separator), | |
'artist': get_credit(track, 'name', artist_separator), | |
} | |
class MBArtistsPlugin(BeetsPlugin): | |
def __init__(self): | |
super(MBArtistsPlugin, self).__init__() | |
self.config.add({ | |
'auto': True, | |
'mbid_separator': '/', | |
'artist': { | |
'separator': ';', | |
} | |
}) | |
self.template_fields['artist_1'] = lambda i: i.artist.split( | |
self.config['artist']['separator'].get(str))[0] | |
f = lambda a: a.albumartist.split( | |
self.config['artist']['separator'].get(str))[0] | |
self.album_template_fields['albumartist_1'] = f | |
if self.config['auto']: | |
self.import_stages = [self.imported] | |
def commands(self): | |
def func(lib, opts, args): | |
""" | |
Command handler for the artists function. | |
""" | |
query = ui.decargs(args) | |
change_flags = FileChange(sum([ | |
FileChange.PRETEND if opts.pretend else 0, | |
FileChange.MOVE if ui.should_move(opts.move) else 0, | |
FileChange.WRITE if ui.should_write(opts.write) else 0, | |
])) | |
self.albums(lib, query, change_flags) | |
cmd = ui.Subcommand( | |
'artists', | |
help=u'Multiple artists from MusicBrainz') | |
cmd.parser.add_option( | |
u'-p', u'--pretend', action='store_true', | |
help=u'show all changes but do nothing') | |
cmd.parser.add_option( | |
u'-m', u'--move', action='store_true', dest='move', | |
help=u"move files in the library directory") | |
cmd.parser.add_option( | |
u'-M', u'--nomove', action='store_false', dest='move', | |
help=u"don't move files in library") | |
cmd.parser.add_option( | |
u'-w', u'--write', action='store_true', dest='write', | |
help=u"write updated metadata to files") | |
cmd.parser.add_option( | |
u'-W', u'--nowrite', action='store_false', dest='write', | |
help=u"don't write updated metadata to files") | |
cmd.func = func | |
return [cmd] | |
def is_mb_release(self, a): | |
if not a.mb_albumid: | |
self._log.info(u'Skipping album with no mb_albumid: {0}', | |
format(a)) | |
return False | |
if not re.match(MBID_REGEX, a.mb_albumid): | |
self._log.info(u'Skipping album with invalid mb_albumid: {0}', | |
format(a)) | |
return False | |
return True | |
def imported(self, session, task): | |
for item in task.imported_items(): | |
self.update_data(item) | |
item.store() | |
def update_data(self, a): | |
if not self.is_mb_release(a): | |
return False | |
release = get_release(a.mb_albumid) | |
artist_sep: str = self.config['artist']['separator'].get(str) | |
mbid_sep: str = self.config['mbid_separator'].get(str) | |
a.albumartist = get_credit(release, 'name', artist_sep) | |
tracks = defaultdict(list) | |
for track in get_tracks(release, artist_separator=artist_sep, | |
mbid_separator=mbid_sep): | |
tracks[track['id']].append(track) | |
if tracks[a['mb_releasetrackid']]: | |
choices = tracks[a['mb_releasetrackid']] | |
if len(choices) == 1: | |
a['artist'] = choices[0]['artist'] | |
a['mb_artistid'] = choices[0]['mb_artistid'] | |
ui.show_model_changes(a) | |
a.store() | |
def albums(self, lib, query, change_flags): | |
"""Retrieve and apply info from the autotagger for albums matched by | |
query and their items. | |
""" | |
artist_sep: str = self.config['artist']['separator'].get(str) | |
mbid_sep: str = self.config['mbid_separator'].get(str) | |
# Process matching albums. | |
for a in [a for a in lib.albums(query) if self.is_mb_release(a)]: | |
# Get the MusicBrainz album information. | |
# TODO: Use information from get_release | |
album_info = hooks.album_for_mbid(a.mb_albumid) | |
if not album_info: | |
self._log.info(u'Release ID {0} not found for album {1}', | |
a.mb_albumid, | |
format(a)) | |
continue | |
release = get_release(a.mb_albumid) | |
a.albumartist = get_credit(release, 'name', artist_sep) | |
tracks = defaultdict(list) | |
for track in get_tracks(release, | |
artist_separator=artist_sep, | |
mbid_separator=mbid_sep): | |
tracks[track['id']].append(track) | |
items = list(a.items()) | |
for item in [i for i in items if tracks[i.mb_releasetrackid]]: | |
choices = tracks[item.mb_releasetrackid] | |
if len(choices) == 1: | |
item['artist'] = choices[0]['artist'] | |
item['mb_artistid'] = choices[0]['mb_artistid'] | |
apply_item_changes(lib, item, change_flags) | |
else: | |
# TODO: We got 2 or more tracks with the same recording | |
self._log.debug(item) | |
self._log.debug(choices) | |
self._log.debug() | |
if FileChange.PRETEND not in change_flags: | |
a.store() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment