Skip to content

Instantly share code, notes, and snippets.

@hetsch
Last active July 20, 2016 07:58
Show Gist options
  • Save hetsch/b5e4f019a8cb4298f47dfe493bd67efa to your computer and use it in GitHub Desktop.
Save hetsch/b5e4f019a8cb4298f47dfe493bd67efa to your computer and use it in GitHub Desktop.
Replay gain processing with bs1770gain
#!/bin/bash
## Usage: rate-music [0-5]
#
# Adds current playing song to the mpd playlist corresponding to the
# rating assigned. Any previous rating is removed. If 0 is given, the
# songs rating will be removed.
#
# From: https://bbs.archlinux.org/viewtopic.php?id=116113
## USER CONFIGURATION-----------------------------------------------------
## Path to playlists
MOUNTPOINT="/Volumes/tank_music"
# mount if not mounted
if ! mount | grep "on $MOUNTPOINT" > /dev/null; then
echo "Mounting the music library"
# Mounting with password and other stuff
# see: http://apple.stackexchange.com/a/197608
osascript -e 'tell application "Finder" to mount volume "smb://192.168.1.143/tank_music"'
fi
PLAYLISTDIR="$MOUNTPOINT/.mpd/playlists"
## END USER CONFIGURATION--------------------------------------------------
## Prefix and suffix strings for the playlist file name
PL_PREFIX='Rating_'
PL_SUFFIX='.m3u'
## Get current song from ncmpcpp or cmus or throw an error
# SONG=`ncmpcpp --current-song '%D/%f' 2>/dev/null` || \
# SONG=`cmus-remote -Q 2>/dev/null | grep file` || \
# { echo "Error: you need either ncmpcpp or cmus installed to run this script. Aborting." >&2; exit 1; }
# ncmpcpp --current-song '%D/%f' did not work for me. Console commands for ncmpcpp have been deprecated
SONG=`mpc -h 192.168.1.143 -f '%file%' current` || { echo "Error: you need mpc installed to run this script. Aborting." >&2; exit 1; }
## Error cases
if [[ -z "$SONG" ]]; then
echo 'No song is playing.'
exit 1
fi
if [[ -z "$1" || "$1" -lt 0 || "$1" -gt 5 ]]; then
echo "Rating must be between 1 and 5. Or 0 (zero) to delete the current song's rating."
exit 1
fi
## Path to lock file
LOCK="/tmp/rate-music.lock"
## Lock the file
# exec 9>"$lock"
# if ! flock -n 9; then
# notify-send "Rating failed: Another instance is running."
# exit 1
# fi
if ! lockfile -r 0 $LOCK; then
# see: http://apple.stackexchange.com/a/79504
osascript -e "display notification \"Rating failed: Another instance is running\" with title \"MPD rating failed\""
exit 1
fi
## Strip "file " from the output
SONG=${SONG/file \///}
## Temporary file for grepping and sorting
TMP="$PLAYLISTDIR/tmp.m3u"
## Remove the song from all rating playlists
for n in {1..5}; do
f="$PLAYLISTDIR/${PL_PREFIX}$n${PL_SUFFIX}"
if [[ -f "$f" ]]; then
grep -vF "$SONG" "$f" > "$TMP"
mv -f $TMP $f
fi
done
## Append the song to the new rating playlist
if [[ $1 -ne 0 ]]; then
f="$PLAYLISTDIR/${PL_PREFIX}$1${PL_SUFFIX}"
mkdir -p "$PLAYLISTDIR"
echo "$SONG" >> "$f"
sort -u "$f" -o "$TMP"
mv -f $TMP $f
fi
## The lock file will be unlocked when the script ends
rm -f $LOCK
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import locale
import socket
import copy
import traceback
import collections
import lxml.html
import taglib
import mutagen
from mutagen.mp3 import MP3
from mutagen.id3 import RVA2, TXXX
from mutagen.apev2 import APEv2
from colorama import init, Fore, Back
init(autoreset=True)
MUSIC_LIBRARY_PATH = None
if socket.gethostname() == "r2d2":
MUSIC_LIBRARY_PATH = "/mnt/tank/music"
else:
MUSIC_LIBRARY_PATH = "/Volumes/tank_music"
def log_success(msg):
print(Fore.GREEN + msg)
def log_info(msg):
print(Fore.BLUE + msg)
def log_failure(msg):
print(Fore.RED + msg)
RG_TAGS = (
'REPLAYGAIN_ALBUM_GAIN',
'REPLAYGAIN_ALBUM_PEAK',
'REPLAYGAIN_ALBUM_RANGE',
'REPLAYGAIN_TRACK_GAIN',
'REPLAYGAIN_TRACK_PEAK',
'REPLAYGAIN_TRACK_RANGE',
'REPLAYGAIN_REFERENCE_LOUDNESS',
'REPLAYGAIN_ALGORITHM'
)
def check(filename):
"""Returns True if the file has all ReplayGain data."""
try:
tags = mutagen.File(filename)
if type(tags) != MP3:
return 'replaygain_track_peak' in tags and 'replaygain_track_gain' in tags
if 'TXXX:replaygain_track_peak' not in tags or 'TXXX:replaygain_track_gain' not in tags:
return False
if 'RVA2:track' not in tags:
return False
tags = APEv2(filename)
return 'replaygain_track_peak' in tags and 'replaygain_track_gain' in tags
except:
return False
def read(filename):
"""Returns (peak, gain) for a file."""
peak = gain = None
def parse_rg(tags):
p = g = None
if 'replaygain_track_peak' in tags:
p = float(tags['replaygain_track_peak'][0])
if 'replaygain_track_gain' in tags:
value = tags['replaygain_track_gain'][0]
if value.endswith(' dB'):
g = float(value[:-3])
else:
log_failure('Malformed track gain info: "%s" in %s' % (value, filename))
return (p, g)
try:
peak, gain = parse_rg(mutagen.File(filename, easy=True))
except:
pass
# Prefer the first value because RVA2 is more precise than
# APE, formatted as %.2f.
if peak is None or gain is None:
try:
peak, gain = parse_rg(APEv2(filename))
except:
pass
return (peak, gain)
def write(filename, peak, gain, range_, ref_loudness=-18.0, algorithm="ITU-R BS.1770"):
"""Writes RG tags to file."""
if peak is None:
raise Exception('peak is None')
elif gain is None:
raise Exception('gain is None')
try:
tags = mutagen.File(filename)
# album and track data
# see: http://wiki.hydrogenaud.io/index.php?title=ReplayGain_2.0_specification#ID3v2
# uppercase or lowercase tags (think that mpd uses lowercase)
# - see: http://getmusicbee.com/forum/index.php?topic=10394.msg73234#msg73234
# - see: http://mpd.wikia.com/wiki/Hack:rg.py
# write the same tags as bs1770gain
# Note: bs1770gain uses "LU" in gain tags, specification proposes "dB". Usually they can be
# converted 1:1 (1 LU == 1 dB)
a_gain, t_gain = map(lambda n: "{:.2f} dB".format(n), gain)
a_peak, t_peak = map(lambda n: "{:.6f}".format(n), peak)
a_range, t_range = map(lambda n: "{:.2f}".format(n), range_)
data = (
a_gain,
a_peak,
a_range,
t_gain,
t_peak,
t_range,
ref_loudness,
algorithm
)
# delete tags
for key in list(tags.keys()):
if key.upper().endswith(RG_TAGS):
tags.pop(key, None)
#tags.save(filename)
#return
if type(tags) == MP3:
# ID3v2.4
for key, value in zip(RG_TAGS, data):
tags['TXXX:{}'.format(key)] = TXXX(encoding=0, desc=key, text=[value])
# RVA2
# shared = {
# 'reference_loudness': ref_loudness,
# 'alorithm': algorithm
# }
# tags['RVA2:album'] = RVA2(desc=u'album', channel=1, peak=a_peak, gain=a_gain, range=a_range, **shared)
# tags['RVA2:track'] = RVA2(desc=u'track', channel=1, peak=t_peak, gain=t_gain, range=t_range, **shared)
tags.save(filename)
# # Additionally write APEv2 tags to MP3 files.
# try:
# tags = APEv2(filename)
# except:
# tags = APEv2()
# for key, value in zip(RG_TAGS, data):
# tags[key.lower()] = value
#
# tags.save(filename)
else:
for key, value in zip(RG_TAGS, data):
tags[key] = value
tags.save(filename)
return True
except Exception as e:
log_failure(traceback.format_exc())
return False
if __name__ == "__main__":
log_info("Lets go ...")
path = MUSIC_LIBRARY_PATH
if len(sys.argv) > 1:
path = os.path.abspath(sys.argv[1])
if not os.path.exists(path):
log_failure("Root path <{}> does not exist".format(path))
sys.exit(1)
folders = []
for folder, subfolders, files in os.walk(path):
for file_ in files:
if file_ == "RG-Results.xml":
# see: https://forums.mp3tag.de/lofiversion/index.php?t20881.htm
# XMLParser is too strict and creates troubles with unescaped ampersands aso.
# see: http://stackoverflow.com/a/26267496
html = lxml.html.parse(os.path.join(folder, file_))
a_gain = float(html.xpath("//album/summary/integrated/@lu")[0])
a_peak = float(html.xpath("//album/summary/true-peak/@factor")[0])
a_range = float(html.xpath("//album/summary/range/@lufs")[0])
# EBU R128 has reference level of -23.0 LUFS we used -18 LUFS, the same as foobar
ref_loudness = -18.0
algorithm = "ITU-R BS.1770 (EBU R128)"
for track in html.xpath("//album/track"):
filepath = os.path.join(folder, track.xpath(".//@file")[0])
t_gain = float(track.xpath("./integrated/@lu")[0])
t_peak = float(track.xpath("./true-peak/@factor")[0])
t_range = float(track.xpath("./range/@lufs")[0])
if write(
filepath,
(a_gain, t_gain),
(a_peak, t_peak),
(a_range, t_range),
ref_loudness,
algorithm
):
log_success("SUCCESS: Written tags for <{}>".format(filepath))
else:
log_failure("ERROR: Couldn't write replay gain tags for <{}>".format(filepath))
# examine result with
# - (bs1770gain): "bs1770gain ~/Downloads/Back\ in\ Black -l"
# - (pytaglib): "pyprinttags ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3"
# - (mutagen): "mid3v2 ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3""
# - (beet); "beet info ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Explanation of the bs1770gain command used in the run_worker function:
1) PRIORITY OF THE PROCESS
"nice -n XX" sets the desired priority to the bs1770gain process. The
higher the parameter, the lower the system priority
2) LOUDNESS CALCULATIONS
- For a good general guide on loudness, see http://transom.org/2015/the-audio-producers-guide-to-loudness/
- For the specs of ITU-R BS.1770 see: http://www.itu.int/rec/R-REC-BS.1770/en
The command calculates the integrated loudness (-i/--integrated), which is a decent predictor of consistency between pieces of audio. It additionally calculates the -s/--shortterm and -m/--momentary loudness because they are not costly and may be used for other purposes.
Additionally, it calculates the loudness range (-r/--range), which is a measure of the variation of loudness over the course of a song.
3) PEAK CALCULATIONS
There are two possibilities to find the peak value of the audio file. -p/--samplepeak and -t/--truepeak. Both are good values but a true peak respects that peaks can lie between samples and are often higher than the peaks shown/calculated by a sample peak meter/algorithm. In doing so, the calculation time is much higher compared to simpler samplepeak calculations. On a simple album -p/--samplepeak takes about 9secs, whereas -t/--truepeak takes 30secs. For more information see https://auphonic.com/blog/2012/08/02/loudness-measurement-and-normalization-ebu-r128-calm-act/.
If calculating the --truepeak of an audio file, the costs of computing the --samplepeak are not noteworthy - also all other
options don't count too much.
With -t/--truepeak:
time bs1770gain -t ~/Downloads/XXX
...
real 0m32.861s
user 0m32.550s
sys 0m0.302s
time bs1770gain -ismrpt ~/Downloads/XXX
...
real 0m34.068s
user 0m33.761s
sys 0m0.302s
Using -p/--samplepeak:
time bs1770gain -p ~/Downloads/XXX
...
real 0m9.322s
user 0m9.048s
sys 0m0.270s
time bs1770gain -ismrp ~/Downloads/XXX
...
real 0m10.233s
user 0m9.968s
sys 0m0.262s
4) NORMALIZATION AND LOUDNESS VALUE
Usually, EBU R128 algorithm normalizes to -23.0 LUFS. This value is typically for TV and radio streams but often experienced as
too low for podcasts and audio listening at home. See https://auphonic.com/blog/2013/01/07/loudness-targets-mobile-audio-podcasts-radio-tv/. The average program loudness resolves around -19 LUFS, ReplayGain2 is around -18 LUFS and that is what Foobar2000
uses, see http://forum.doom9.org/showpost.php?p=1701328&postcount=21 and other comments in this thread. Also see https://forum.dbpoweramp.com/showthread.php?29262-Replay-Gain-dbPoweramp-vs-Foobar&p=143105&viewfull=1#post143105.
By default this script goes with the foobar2000 settings by using "--norm -18.0". For example, this changes the "lu" value in the resulting XML from e.g. (-23.0 LUFS) <integrated lufs="-10.76" lu="-12.24" />
to (-18.0 LUFS) <integrated lufs="-10.76" lu="-7.24" />.
"""
import os
import sys
import multiprocessing
import threading
import subprocess
import signal
import shlex
import time
import socket
import traceback
import unicodedata
import logging
import argparse
from time import perf_counter
from io import StringIO
import lxml.html
import taglib
import colorama
from colorama import init, Fore
init(autoreset=True)
###########################################################
# C O N F I G U R A T I O N
###########################################################
MUSIC_LIBRARY_PATH = None
if socket.gethostname().lower() == "r2d2":
MUSIC_LIBRARY_PATH = "/mnt/tank/music"
else:
MUSIC_LIBRARY_PATH = "/Volumes/tank_music"
# Utilize only the half of the processors
NUM_CPUS = int(multiprocessing.cpu_count() / 2)
# The linux "nice" priority number
WORKER_PROCESS_PRIORITY = 19
# Algorithm and associated default normalizaiton LUFS (reference loudness)
CALC_METHODS = (
("ebu", -23.0), # EBU R128 (default)
("atsc", -24.0), # ATSC A/85
("replaygain", -18.0) # ReplayGain 2.0
)
CALC_ALGORITHM_FULL = {
"ebu": "EBU R128",
"atsc": "ATSC A/85",
"replaygain": "ReplayGain 2.0"
}
# Choose the algorithm
CALC_METHOD = CALC_METHODS[0]
# Override default normalization LUFS (reference loudness)
FORCE_REFERENCE_LOUDNESS = -18.0
# The maximum time in seconds that the calculation process is allowed
# to take before terminating it. It sometimes happens that
# bs1770gain hangs at a specific album. 300 seconds (5min) should be
# good for most audio files
MAX_CALCULATION_DURATION = 300
# Taken from Wikipedia
AUDIO_EXTS = (
".3gp",
".aa",
".aac",
".aax",
".act",
".aiff",
".aif",
".amr",
".ape",
".au",
".awb",
".dct",
".dss",
".dvf",
".flac",
".gsm",
".iklax",
".ivs",
".m4a",
".m4b",
".m4p",
".mmf",
".mp3",
".mpc",
".msv",
".ogg",
".oga",
".opus",
".ra",
".rm",
".raw",
".sln",
".vox",
".wav",
".wma",
".wv",
".webm"
)
RG_TAGS = (
'REPLAYGAIN_ALBUM_GAIN',
'REPLAYGAIN_ALBUM_PEAK',
'REPLAYGAIN_ALBUM_RANGE',
'REPLAYGAIN_TRACK_GAIN',
'REPLAYGAIN_TRACK_PEAK',
'REPLAYGAIN_TRACK_RANGE',
'REPLAYGAIN_REFERENCE_LOUDNESS',
#'QUODLIBET::REPLAYGAIN_REFERENCE_LOUDNESS',
'REPLAYGAIN_ALGORITHM'
)
RG_RESULT_FILE = "RG-Results.xml"
# Save the current processed folder for resuming ...
PROCESSED_STATE_FILE = 'RG-Processed'
# Exclude folders from processing
EXCLUDES = (
os.path.join(MUSIC_LIBRARY_PATH, "Dessou's Club"),
)
# bs1770gain uses "LU" in its replaygain tags,
# whereas replaygain2 specification proposes "dB".
# Usually they can be converted 1:1 (1 LU == 1 dB)
# see: https://sourceforge.net/p/idjc/bugs/79/
# MPD supports "LU" so we go with them
LOUDNESS_UNIT = "LU" # "dB"
# The root path from which all relative paths will be calculated
# Will be set in the __main__ function
ROOT_PATH = None
###########################################################
# L O G G I N G
###########################################################
# A new success level and colored logger
# see: https://gist.github.com/hit9/5635505
# see: https://gist.github.com/kergoth/813057
# between WARNING and INFO
logging.SUCCESS = 25
logging.addLevelName(logging.SUCCESS, 'SUCCESS')
class ColorizingStreamHandler(logging.StreamHandler):
color_map = {
logging.DEBUG: Fore.WHITE,
logging.INFO: Fore.BLUE,
logging.WARNING: Fore.YELLOW,
logging.ERROR: Fore.RED,
logging.CRITICAL: Fore.RED,
logging.SUCCESS: Fore.GREEN
}
def __init__(self, stream, color_map=None):
logging.StreamHandler.__init__(self,
colorama.AnsiToWin32(stream).stream)
if color_map is not None:
self.color_map = color_map
@property
def is_tty(self):
isatty = getattr(self.stream, 'isatty', None)
return isatty and isatty()
def format(self, record):
message = logging.StreamHandler.format(self, record)
if self.is_tty:
# Don't colorize a traceback
parts = message.split('\n', 1)
parts[0] = self.colorize(parts[0], record)
message = '\n'.join(parts)
return message
def colorize(self, message, record):
try:
return (self.color_map[record.levelno] + message +
colorama.Style.RESET_ALL)
except KeyError:
return message
logger = logging.getLogger("Core")
t_logger = logging.getLogger("Tags")
c_logger = logging.getLogger("Calc")
def setup_logging():
loggers = (logger, t_logger, c_logger)
handler_1 = ColorizingStreamHandler(sys.stdout)
handler_1.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
handler_2 = logging.FileHandler("replaygain.log")
handler_2.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
for l in loggers:
l.addHandler(handler_1)
l.addHandler(handler_2)
l.setLevel(logging.DEBUG)
# add a success method to the logger instance
setattr(l, 'success', lambda message, *args: l._log(logging.SUCCESS, message, args))
setup_logging()
###########################################################
# E X E P T I O N S
###########################################################
class WorkerException(Exception):
def __init__(self, message, path):
self.path = path
self.message = message
# Error with pickling exception classes
# see: http://stackoverflow.com/a/28335286
super().__init__(message, path)
class TaggingException(WorkerException):
pass
class CalculatingException(WorkerException):
pass
###########################################################
# T A G G I N G
#
# examine result with
# - (bs1770gain): "bs1770gain ~/Downloads/Back\ in\ Black -l"
# - (pytaglib): "pyprinttags ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3"
# - (mutagen): "mid3v2 ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3""
# - (beet); "beet info ~/Downloads/Back\ in\ Black/01\ Hells\ Bells.mp3"
###########################################################
class ReplayGainData(object):
def __init__(self, gain, peak, range_):
self._gain = gain
self._peak = peak
self._range = range_
@property
def gain(self):
return "{:.2f} {}".format(self._gain, LOUDNESS_UNIT)
@property
def peak(self):
return "{:.6f}".format(self._peak)
@property
def range(self):
return "{:.2f} {}".format(self._range, LOUDNESS_UNIT)
class ReplayGainResult(object):
def __init__(self, path, album, track, ref_loudness=None, algorithm=None):
self.path = path
self.album = album
self.track = track
self._ref_loudness = ref_loudness
self._algorithm = algorithm
@property
def ref_loudness(self):
ref_loudness = self._ref_loudness
if not ref_loudness:
algorithm, ref_loudness = CALC_METHOD
if isinstance(FORCE_REFERENCE_LOUDNESS, float):
ref_loudness = FORCE_REFERENCE_LOUDNESS
return "{:.2f}".format(ref_loudness)
@property
def algorithm(self):
algorithm = self._algorithm
if not algorithm:
algorithm, ref_loundess = CALC_METHOD
# if algorithm == "ebu":
# algorithm = "EBU R128"
# elif algorithm == "atsc":
# algorithm = "ATSC A/85"
# elif algorithm == "replaygain":
# algorithm = "ReplayGain 2.0"
algorithm = CALC_ALGORITHM_FULL[algorithm]
return "ITU-R BS.1770 ({})".format(algorithm)
@property
def id3_tags(self):
mapping = {}
for key, value in zip(RG_TAGS, (
self.album.gain,
self.album.peak,
self.album.range,
self.track.gain,
self.track.peak,
self.track.range,
self.ref_loudness,
self.algorithm
)):
mapping[key] = value
return mapping
def get_replaygain_xml_data(path, ref_loudness=None, algorithm=None):
xml_path = os.path.join(path, RG_RESULT_FILE)
try:
# Unicode normalization between OSX and Linux (Ubuntu)
# There is a special case if bs1770gain is executed on a
# OSX box and both filesystems, on OSX and Linux, are UTF-8
# and, therefore, unicode. Linux and Windows use NFC whereas
# OSX uses NFD unicode.
# see: https://en.wikipedia.org/wiki/Unicode_equivalence
# see: http://nedbatchelder.com/blog/201106/filenames_with_accents.html
# LXML does not handle this special case very well and does
# not autoswitch between NFC and NFD. This is especially important
# for file and directory names.
#
# Sidenote: If OSX has problems to display the correct filenames of
# a SAMBA share, check if vfs_fruit is enabled and configured properly.
# see: https://www.mankier.com/8/vfs_fruit
# see: https://lists.samba.org/archive/samba/2014-September/184761.html
# see: https://lists.samba.org/archive/samba/2014-December/187568.html
#
# Here are some possibilites
# 1) Use rsync to transfer the file from OSX over ssh to the
# Linux box
# rsync -a --iconv=utf-8-mac,utf-8 localdir/ server:remotedir/
# see: http://serverfault.com/a/427200
# 2) Convert the NFD unicode file on the OSX box to NFC
# iconv -f UTF-8-MAC -t UTF-8 RG-Results.xml > RG-Results.conv.xml
# see: http://stackoverflow.com/q/14682829
# uconv -f utf8 -t utf8 -x nfc RG-Results.xml -o RG-Results.conv.xml
# see: https://www.win.tue.nl/~aeb/linux/uc/nfc_vs_nfd.html
# 3) Convert all NFD filenames on the Linux box to NFC
# convmv -f utf8 -t utf8 --nfc --replace --nosmart (--notest) -r uploads/
# see: https://gist.github.com/dessibelle/4685735
# 4) Use the python solution below via unicodedata.normalize("NFC", value)
# and pass a StringIO to lxml.parse
# There are special characters like "&" in the bs1770gain result file that
# are not escaped to conform XML standards. Parse the file with a not so
# strict html parser.
# see: http://stackoverflow.com/a/26267496
with open(xml_path, "r", encoding="utf-8") as f:
if sys.platform.startswith('darwin'):
data = unicodedata.normalize("NFD", f.read()) # f.read()
else:
data = unicodedata.normalize("NFC", f.read())
xml = lxml.html.parse(StringIO(data))
#xml = lxml.html.parse(xml_path)
except IOError as e:
raise TaggingException('<{}> could not be found'.format(RG_RESULT_FILE), path)
# How to link the values of the result file to replaygain values?
# see: https://forums.mp3tag.de/lofiversion/index.php?t20881.htm
results = []
def get_value(node, value):
try:
return node.xpath(value)[0]
except IndexError as e:
raise TaggingException("<{}> seems to be malformed and missing the <{}> tag/attribute".format(RG_RESULT_FILE, value), path)
return None
a_data = ReplayGainData(
float(get_value(xml, "//album/summary/integrated/@lu")),
float(get_value(xml, "//album/summary/true-peak/@factor")),
float(get_value(xml, "//album/summary/range/@lufs"))
)
for track in xml.xpath("//album/track"):
t_path = os.path.join(path, get_value(track, ".//@file"))
t_data = ReplayGainData(
float(get_value(track, "./integrated/@lu")),
float(get_value(track, "./true-peak/@factor")),
float(get_value(track, "./range/@lufs"))
)
results.append(ReplayGainResult(t_path, a_data, t_data, ref_loudness, algorithm))
return results
def clear_replaygain_id3():
pass
def write_replaygain_id3(data):
if not os.path.exists(data.path):
raise TaggingException("Audio file does not exist in folder", data.path)
# sometimes bs1770gain also includes non audio files in its resulting xml
# with nonsense data, of course
if not os.path.splitext(data.path)[1] in AUDIO_EXTS:
t_logger.warning("File <{}> has not a valid audio file extension. Skip tagging".format(data.path))
return
f = None
try:
f = taglib.File(data.path)
# @todo: there are strange problems with m4a files and
# tagging. It seems that taglib does not write tags for
# m4a files. Maybe issue a github ticket?
# remove unsupported tag names (like from itunes ...)
if f.unsupported:
t_logger.debug("Removing unsupported tags <{}>".format(f.unsupported))
f.removeUnsupportedProperties(f.unsupported)
k = list(map(str.upper, f.tags.keys()))
for i, v in data.id3_tags.items():
# first try to delete old data in lowercase
if i in k:
try:
# delete possible lowercase duplicate tag,
# the uppercase one will be overwritten anyways ...
del f.tags[i.lower()]
except KeyError as e:
pass
# then save new tags
f.tags[i] = v
f.save()
except Exception as e:
raise TaggingException("Failed saving tags of audio file", data.path).with_traceback(e.__traceback__)
finally:
# close the file anyways
if f:
f.close()
def get_audio_files(folder):
for f in os.listdir(folder):
if os.path.isfile(f) and not f.startswith(".") and os.path.splitext(f)[1] in AUDIO_EXTS:
yield f
def calc_vars():
algorithm, ref_loudness = CALC_METHOD
if isinstance(FORCE_REFERENCE_LOUDNESS, float):
ref_loudness = FORCE_REFERENCE_LOUDNESS
return (ref_loudness, algorithm)
def start_tagging(folder, ref_loudness, algorithm):
t_logger.debug("Tagging audio files of folder <{}>".format(folder))
start = perf_counter()
results = get_replaygain_xml_data(folder, ref_loudness, algorithm)
if results:
# Assume that we have the same amount of audio files
# In the folder and in the result XML
audio_files = list(get_audio_files(folder))
if not len(audio_files) == len(results):
raise TaggingException("The amount of audio files <{}> does not match the amout of files in the XML file <{}>".format(len(audio_files), len(results)), folder)
for result in results:
write_replaygain_id3(result)
t_logger.debug("Finished tagging <{}> files in <{:.2f}> seconds, path <{}>".format(len(results), perf_counter() - start, folder))
###########################################################
# C A L C U L A T I N G
###########################################################
def start_calculating(folder, ref_loudness, algorithm):
c_logger.debug("Calculating replay gain for folder <{}>".format(folder))
start = perf_counter()
# measure all -i/--integrated, -s/--shortterm, -m/--momentary, -r/--range, -p/--samplepeak, and -t/--truepeak
command = 'nice -n {} bs1770gain "{}" -ismrpt --{} --norm {} --xml -f "{}"'.format(
WORKER_PROCESS_PRIORITY,
folder,
algorithm,
ref_loudness,
os.path.join(folder, RG_RESULT_FILE)
)
try:
# Run until maxium calculation time is reached
process = subprocess.run(
shlex.split(command),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
# if the process takes longer than this timeout (secs)
# raise a TimeoutExpired exception
timeout=MAX_CALCULATION_DURATION,
# if return code is not zero raise a CalledProcessError exception
check=True
)
# process.check_returncode()
except subprocess.CalledProcessError as e:
raise CalculatingException("bs1770gain failed calculating replay gain data", folder)
except subprocess.TimeoutExpired as e:
raise CalculatingException("bs1770gain was terminated because it took longer than the maximum execution time", folder)
c_logger.debug("Finished calculating in {:.2f} seconds, path <{}>".format(perf_counter() - start, folder))
###########################################################
# M U L T I P R O C E S S I N G
###########################################################
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def run_worker(folder, calculating, tagging):
proc = multiprocessing.current_process()
start = perf_counter()
# create a subprocess and run it
logger.debug("Running worker, folder <{}>, name <{}>, pid <{}>".format(folder, proc.name, proc.pid))
# get normalization vars
ref_loudness, algorithm = calc_vars()
#if is_tagged():
# return
if calculating:
logger.debug("Starting to calculate replay gain for folder <{}> ...".format(folder))
start_calculating(folder, ref_loudness, algorithm)
if tagging:
logger.debug("Starting to tag folder <{}> ...".format(folder))
start_tagging(folder, ref_loudness, algorithm)
logger.debug("Finished worker, folder <{}>, name <{}>, pid <{}>, secs <{:.2f}>".format(folder, proc.name, proc.pid, perf_counter() - start))
return (folder, proc.pid)
def log_path(postfix):
fp = os.path.join(ROOT_PATH, PROCESSED_STATE_FILE)
return "{}.{}.txt".format(fp, postfix)
def get_folders(path):
# find all file extension within a directory
# see: http://stackoverflow.com/a/4998326/1230358
# find /mnt/tank/music/ -type f -name '*.*' | sed 's|.*\.||' | sort -u
#audio_exts = ('.aif', '.flac', '.m4a', '.mp3', '.mpeg', '.ogg', '.wav')
# only the folders that contain the above media files
# see: http://stackoverflow.com/a/9997442/1230358
folders = []
for folder, subfolders, files in os.walk(path):
# Skip special folders
if folder.startswith(EXCLUDES):
continue
# Skip root folder
if path == MUSIC_LIBRARY_PATH and folder == ROOT_PATH:
continue
# exit if folder was allready processes
# if os.path.isfile(os.path.join(folder, "RG-Results.xml")):
# print("Skipping folder <{}>".format(folder))
# continue
# else check if we have valid audio files in directory
for file_ in files:
if os.path.splitext(file_)[-1].lower() in AUDIO_EXTS:
folders.append(os.path.relpath(folder, ROOT_PATH))
return set(folders)
def get_folders_processed(path):
def c(x):
for y in x:
if y.strip():
yield y.split("☢☢")[1].strip()
try:
with open(log_path("success"), 'r') as f:
return set(c(f.readlines()))
except IOError as e:
logger.warning("Could not gather already processed file for folder <{}>".format(path))
#logger.exception(e)
return set([])
def is_tagged(path, ref_loudness, algorithm):
f = taglib.File(path)
valid = False
try:
if float(f.tags[RG_TAGS[6]][0]) == float(ref_loudness) and f.tags[RG_TAGS[7]][0] == algorithm:
valid = True
except KeyError:
pass
finally:
f.close()
return valid
def validate(folders):
logger.info("Start validating")
start = perf_counter()
ref_loudness, algorithm = calc_vars()
# get the full algorithm name as in tags
algorithm = "ITU-R BS.1770 ({})".format(CALC_ALGORITHM_FULL[algorithm])
for folder in folders:
folder_abs = os.path.join(ROOT_PATH, folder)
if os.path.exists(os.path.join(folder_abs, RG_RESULT_FILE)):
for folder_, subfolders_, files_ in os.walk(folder_abs):
for file_ in files_:
if os.path.splitext(file_)[-1].lower() in AUDIO_EXTS:
f = os.path.join(folder_, file_)
if not is_tagged(f, ref_loudness, algorithm):
logger.error("Audio file <{}> has no valid replay gain tags".format(f))
else:
logger.error("No <{}> file was found in <{}>".format(RG_RESULT_FILE, folder))
logger.info("Finished validating, secs <{:.2f}>".format(perf_counter() - start))
# http://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces
# http://chriskiehl.com/article/parallelism-in-one-line/
# http://stackoverflow.com/a/11623718
def main(folders, calculating=False, tagging=False):
logger.debug("Initializing <{}> workers".format(NUM_CPUS))
pool = multiprocessing.Pool(NUM_CPUS, init_worker)
# calculating(True/False):tagging(True/False)
mode = "{}:{}".format(calculating, tagging)
# These callbacks run in the main process not in the worker processes
# but in their own threads. So care for thread safety in them!
success_lock = threading.Lock()
def worker_on_success(result):
folder, process = result
logger.success('Worker <{}> finished!'.format(folder))
# ensure unique write access
success_lock.acquire()
with open(log_path("success"), "a+") as f:
f.write("{}☢☢{}\n".format(mode, os.path.relpath(folder, ROOT_PATH)))
success_lock.release()
error_lock = threading.Lock()
def worker_on_error(e):
# todo: strange things going on here with the code
# maybe wrap that in functools.partial
if isinstance(e, TaggingException):
logger.error("Tagging failed for <{}> with message <{}>".format(e.path, e.message))
elif isinstance(e, CalculatingException):
logger.error("Calculating replay gain failed for <{}> with message <{}>".format(e.path, e.message))
else:
logger.error("An unknown exception happend")
logger.exception(e)
# ensure unique write access
error_lock.acquire()
with open(log_path("error"), "a+") as f:
f.write("{}☢☢{}\n".format(mode, os.path.relpath(e.path, ROOT_PATH)))
error_lock.release()
workers = []
for folder in folders:
# returns a future
workers.append(pool.apply_async(
run_worker,
(
os.path.normpath(os.path.join(ROOT_PATH, folder)),
calculating,
tagging
),
callback=worker_on_success,
error_callback=worker_on_error
))
# Keyboard interrupts and mutliprocessing
# see: http://noswap.com/blog/python-multiprocessing-keyboardinterrupt
# http://stackoverflow.com/questions/28674518/multiprocessing-pool-wait-for-all-results-but-process-individual-results-imme
try:
for worker in workers:
worker.wait()
# avoid zombies, see: http://stackoverflow.com/a/35372311
# while True:
# time.sleep(1)
# if not multiprocessing.active_children():
# log_info("No active workers left ...")
# break
except KeyboardInterrupt:
logger.info("Caught KeyboardInterrupt, terminating workers ...")
pool.terminate()
pool.join()
else:
logger.info("Finished. Quitting normally ...")
pool.close()
pool.join()
# killall leftover zombie processes
os.system('pkill bs1770gain')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Process replaygain on a folder and its audio data')
_ = parser.add_argument('path', metavar='in-file', help='The folder to process')
#_ = parser.add_argument('-r', '--resume', action='store_false', help='Resume without processing already processed')
_ = parser.add_argument('-t', '--tag', action='store_true', help='Just process id3 tagging of the files within the folder')
_ = parser.add_argument('-c', '--calc', action='store_true', help='Just calculate the replay gain data of the files within the folder')
_ = parser.add_argument('-f', '--force', action='store_true', help='Recalculate or tag already processed folders/files')
_ = parser.add_argument('-T', '--timeout', default=False, type=int, help='The duration (secs) the calculatioin process is allowed to take')
_ = parser.add_argument('-v', '--validate', action='store_true', help='Checks if all audio files within the folder have valid replay gain tags')
args = parser.parse_args()
# Set worker timeout
if args.timeout:
MAX_CALCULATION_DURATION = args.timeout
logger.info("Setting worker timeout to <{}> seconds".format(args.timeout))
logger.info("Lets go ...")
path = MUSIC_LIBRARY_PATH
if args.path:
path = os.path.abspath(args.path)
ROOT_PATH = path
if not os.path.exists(path):
logger.error("Root path <{}> does not exist".format(path))
sys.exit(1)
folders = None
# resume the last state
if not args.force:
a_f = get_folders(path)
p_f = get_folders_processed(path)
folders = sorted(a_f - p_f, key=str.lower)
# totally new run
else:
folders = get_folders(path)
if not folders:
logger.info("Nothing left to process")
# just validate the audio files within the folder
if args.validate:
validate(folders)
# calculate or tag or both replay gain data/tags
else:
calc = args.calc
tag = args.tag
if not tag and not calc:
# do all of them
calc = True
tag = True
main(folders, calc, tag)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# see: http://stackoverflow.com/a/14879370
import os
import sys
import socket
import traceback
import lxml.etree as et
from colorama import init, Fore, Back
init(autoreset=True)
MUSIC_LIBRARY_PATH = None
if socket.gethostname() == "r2d2":
MUSIC_LIBRARY_PATH = "/mnt/tank/music"
else:
MUSIC_LIBRARY_PATH = "/Volumes/tank_music"
def log_success(msg):
print(Fore.GREEN + msg)
def log_info(msg):
print(Fore.BLUE + msg)
def log_failure(msg):
print(Fore.RED + msg)
class hashabledict(dict):
def __hash__(self):
return hash(tuple(sorted(self.items())))
class XMLCombiner(object):
def __init__(self, filenames):
assert len(filenames) > 0, 'No filenames!'
# XMLParser is too strict and creates troubles with unescaped ampersands aso.
# Use HTMLParser instead
# see: http://stackoverflow.com/a/26267496
# also strip all whitespace for pretty printing afterwords
parser = et.HTMLParser(remove_blank_text=True)
# save all the roots, in order, to be processed later
self.roots = [et.parse(f, parser).getroot() for f in filenames]
def combine(self):
for r in self.roots[1:]:
# combine each element with the first one, and update that
self.combine_element(self.roots[0], r)
# return the string representation
return et.ElementTree(self.roots[0])
def combine_element(self, one, other):
"""
This function recursively updates either the text or the children
of an element if another element is found in `one`, or adds it
from `other` if not found.
"""
# Create a mapping from tag name to element, as that's what we are fltering with
mapping = {(el.tag, hashabledict(el.attrib)): el for el in one}
for el in other:
# skip old integrated
if el.tag == 'integrated':
continue
if len(el) == 0:
# Not nested
try:
# Update the text
mapping[(el.tag, hashabledict(el.attrib))].text = el.text
except KeyError:
# An element with this name is not in the mapping
mapping[(el.tag, hashabledict(el.attrib))] = el
# Add it
one.append(el)
else:
try:
# Recursively process the element, and update it in the same way
self.combine_element(mapping[(el.tag, hashabledict(el.attrib))], el)
except KeyError:
# Not in the mapping
mapping[(el.tag, hashabledict(el.attrib))] = el
# Just add it
one.append(el)
if __name__ == '__main__':
# rg_file = sys.argv[1]
# if os.path.exists(rg_file):
# r = XMLCombiner((rg_file, "{}.orig".format(rg_file))).combine()
# pprint.pprint(et.tostring(r.getroot(), pretty_print=True))
try:
with open(os.path.join(MUSIC_LIBRARY_PATH, "RG-Processed.txt"), 'r') as f:
processed = set(map(str.strip, f.readlines()))
except:
processed = set([])
for folder in processed:
info_file = os.path.join(MUSIC_LIBRARY_PATH, folder, "RG-Results.xml")
if os.path.exists(info_file):
try:
r = XMLCombiner((info_file, "{}.orig".format(info_file))).combine()
with open("{}.comb".format(info_file), "bw+") as f:
f.write(et.tostring(r.getroot(), pretty_print=True))
print("SUCCESS: Combined info file <{}>".format(info_file))
except Exception as e:
log_failure("FAILURE: Combined info file <{}>".format(info_file))
traceback.print_exc(file=sys.stdout)
with open(os.path.join(MUSIC_LIBRARY_PATH, "RG_Failure_1.txt"), 'a+') as f:
f.write("{}\n".format(os.path.relpath(os.path.dirname(info_file), MUSIC_LIBRARY_PATH)))
@hetsch
Copy link
Author

hetsch commented Jun 29, 2016

Splitting a single flac into the tracks via a cue sheet

see: https://wiki.archlinux.org/index.php/CUE_Splitting

cuebreakpoints *.cue | shnsplit -o flac *.flac ;
# cuebreakpoints *.cue | shnsplit -o flac -f *.cue -t "%n - %t" *.flac ;
cuetag.sh *.cue split-track*.flac
# cuetag.sh *.cue *.flac

@hetsch
Copy link
Author

hetsch commented Jul 3, 2016

rsync -a ~/bin/replaygain 192.168.1.143:/home/hetsch/.local/bin/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment