Skip to content

Instantly share code, notes, and snippets.

@8h2a
Last active January 26, 2019 16:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 8h2a/aaabd19591c98fd86415eb75547a265e to your computer and use it in GitHub Desktop.
Save 8h2a/aaabd19591c98fd86415eb75547a265e to your computer and use it in GitHub Desktop.
beets-guess-media
"""
Based on bitdepths/samplerates and (EAC/XLD) log-files,
determine if the media is likely a CD or likely not a CD.
Additionally checks the log files for a TOC,
and tries to get the release ids from musicbrainz.
"""
from beets.plugins import BeetsPlugin
from beets.autotag import TrackInfo
from beets.autotag import hooks
import os.path
import re
from glob import glob
from collections import namedtuple
import io
import musicbrainzngs
MatchData = namedtuple('MatchData', ['has_log', 'ids'])
_matches = {} # dict: key = dirname, value = MatchData
def _get_toc_string_from_log(lines):
"""
Returns a toc string or None for a given log file (EAC or XLD)
Copyright (c) 2018 Konstantin Mochalov
Released under the MIT License
Source: https://gist.github.com/kolen/765526
"""
def _filter_toc_entries(lines):
"""
Take iterator of lines, return iterator of toc entries
"""
while True:
line = lines.next()
# TOC table header:
if re.match(r""" \s*
.+\s+ \| (?#track)
\s+.+\s+ \| (?#start)
\s+.+\s+ \| (?#length)
\s+.+\s+ \| (?#start sec)
\s+.+\s*$ (?#end sec)
""", line, re.X):
lines.next()
break
while True:
line = lines.next()
m = re.match(r"""
^\s*
(?P<num>\d+)
\s*\|\s*
(?P<start_time>[0-9:.]+)
\s*\|\s*
(?P<length_time>[0-9:.]+)
\s*\|\s*
(?P<start_sector>\d+)
\s*\|\s*
(?P<end_sector>\d+)
\s*$
""", line, re.X)
if not m:
break
yield m.groupdict()
PREGAP = 150
try:
entries = list(_filter_toc_entries(lines))
num_entries = len(entries)
tracknums = [int(e['num']) for e in entries]
if range(1, num_entries+1) != tracknums:
# Non-standard track number sequence
return None
leadout_offset = int(entries[-1]['end_sector']) + PREGAP + 1
offsets = [(int(x['start_sector']) + PREGAP) for x in entries]
toc_numbers = [1, num_entries, leadout_offset] + offsets
return " ".join(str(x) for x in toc_numbers)
except Exception as e:
# can fail if the log file is malformed
pass
return None
def _get_releases_from_toc(toc):
"""Returns a list of musicbrainz release IDs from a toc string"""
res = musicbrainzngs.get_releases_by_discid(id="", toc=toc)
if res['release-list']:
return [release['id'] for release in res['release-list']]
def _parse_logfile(filename):
"""
Given a filename, parses a XLD/EAC log file.
Returns a list of musicbrainz-IDs (or an empty list) if possible,
otherwise None.
"""
eac_regex = re.compile(r'Exact Audio Copy*')
xld_regex = re.compile(r'X Lossless Decoder*')
def _read_and_match(file_handle):
line = file_handle.readline()
if eac_regex.match(line) or xld_regex.match(line):
toc = _get_toc_string_from_log(file_handle)
ids = set(_get_releases_from_toc(toc)) if toc else []
return ids
return None
try:
try:
with io.open(filename, encoding='utf-8') as f:
return _read_and_match(f)
except UnicodeDecodeError:
with io.open(filename, encoding='utf-16') as f:
return _read_and_match(f)
except Exception as e:
pass
return None
def _process_items(items):
"""Checks for valid logfiles, extracts TOC if possible,
and adds the results to the global dict.
Returns a set of musicbrainz-IDs if a valid log file was found,
otherwise None.
"""
paths = set(map(lambda item: os.path.dirname(item.path), items))
ids = set()
log_found = False
for path in paths:
matchdata_has_log = False
matchdata_ids = set()
if path not in _matches:
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
if not filename.lower().endswith(".log"):
continue
log_ids = _parse_logfile(os.path.join(dirpath, filename))
if not log_ids:
continue
matchdata_has_log = True
matchdata_ids.update(log_ids)
# Add result for current path to global dict
_matches[path] = MatchData(has_log=matchdata_has_log,
ids=matchdata_ids)
if _matches[path].has_log:
ids.update(_matches[path].ids)
log_found = True
return ids if log_found else None
class GuessMedia(BeetsPlugin):
def __init__(self):
super(GuessMedia, self).__init__()
self.config.add({
'media_weight': 1.0,
'album_id_weight': 1.0,
})
self.register_listener('import_task_start', self.import_task_start)
def import_task_start(self, task, session):
items = task.items if task.is_album else [task.item]
_process_items(items)
def candidates(self, items, artist, album, va_likely):
releases = []
release_ids = _process_items(items)
if not release_ids:
return releases
for id in release_ids:
try: # album_for_mbid may raise a MusicBrainzAPIError
albuminfo = hooks.album_for_mbid(id)
if albuminfo:
releases.append(albuminfo)
except:
pass
return releases
def album_distance(self, items, album_info, mapping):
dist = hooks.Distance()
# check if the album has a log file (from EAC/XLD):
release_ids = _process_items(items)
has_log = release_ids is not None
# get bitdepths and samplerates
bitdepths = set(map(lambda item: item.bitdepth, items))
samplerates = set(map(lambda item: item.samplerate, items))
# Boolean flags to determine media type
is_not_cd = max(bitdepths) > 16 or max(samplerates) != 44100
could_be_cd = not is_not_cd
candidate_media_is_cd = \
"CD" in album_info.media.upper() if album_info.media else False
# penalty for CDs if it's clearly not a CD
if is_not_cd and candidate_media_is_cd:
dist.add('media', self.config['media_weight'].as_number())
# penalty if we think it's a CD (found a log file and
# bitdepths/samplerates are correct) but the candidate media is wrong:
if has_log and could_be_cd and not candidate_media_is_cd:
dist.add('media', self.config['media_weight'].as_number())
# penalty if we found an album id from the log file,
# and the album id does not match:
if release_ids is not None and album_info.album_id not in release_ids:
dist.add('album_id', self.config['album_id_weight'].as_number())
return dist
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment