Created
March 12, 2021 19:51
-
-
Save frederik-elwert/1e41a04d317a58b69ddb7d1882e98a3f to your computer and use it in GitHub Desktop.
Script for converting music or audio books into the format and directory structure required by TonUINO
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import argparse | |
import logging | |
import shutil | |
import subprocess | |
from pathlib import Path | |
MAX_DIRS = 99 | |
ALLOWED_EXTS = ['.mp3', '.ogg', '.oga', '.flac'] | |
AUDIO_QUALITY = 4 | |
class parse_merge_spec(argparse.Action): | |
"""Parse merge-spec option.""" | |
def __call__(self, parser, namespace, values, option_string): | |
"""Return list of lists of track numbers. | |
>>> parse_merge_spec()(['1-3', '4-5']) | |
[[1, 2, 3], [4, 5]] | |
""" | |
out = [] | |
for spec in values: | |
if '-' in spec: | |
# Track range | |
start, end = [int(s) for s in spec.split('-')] | |
out.append(list(range(start, end + 1))) | |
else: | |
# Single track | |
out.append([int(spec)]) | |
setattr(namespace, self.dest, out) | |
class MusicCopier: | |
"""Copy music from a source to SD Card.""" | |
def __init__(self, source, dest, log_file=None, keep_filename=False, | |
append=None, merge_spec=None, dry_run=False): | |
self.source = Path(source) | |
self.dest = Path(dest) | |
self.log_file = Path(log_file) | |
self.keep_filename = keep_filename | |
self.merge_spec = merge_spec | |
self.dry_run = dry_run | |
self.append = append | |
self.start_no = 1 # gets overridden later if append != None | |
def get_folder_path(self): | |
dirs = sorted(self.dest.iterdir()) | |
if not dirs: | |
return self.dest / '01' | |
# Quick check | |
for i, folder in enumerate(dirs, start=1): | |
assert f'{i:02d}' == folder.name, 'Folders not in sequential order' | |
if i >= MAX_DIRS: | |
logging.error('Maximal number of folders reached.' | |
' Cannot add music.') | |
# TODO: Raise proper exception | |
return None | |
# Append to existing folder? | |
path = None | |
if self.append == -1: | |
path = dirs[-1] | |
elif self.append: | |
path = dirs[self.append - 1] # Convert to 0-based index | |
if path: | |
logging.debug(f'Append to path {path}.') | |
files = sorted(path.glob('*.mp3')) | |
last_file = files[-1] | |
self.start_no = int(last_file.name[0:3]) + 1 # number part | |
logging.debug(f'Start with file number {self.start_no}.') | |
return path | |
# Or return a new one. | |
return self.dest / f'{i+1:02d}' | |
def get_file_path(self, counter, source_file=None): | |
if self.keep_filename and source_file: | |
base = source_file.stem.replace(' ', '_') | |
return self.folder_path / f'{counter:03d}_{base}.mp3' | |
else: | |
return self.folder_path / f'{counter:03d}.mp3' | |
def copy(self): | |
self.folder_path = self.get_folder_path() | |
if not self.folder_path.is_dir() and not self.dry_run: | |
logging.debug(f'Create output directory {self.folder_path}.') | |
self.folder_path.mkdir() | |
source_files = [f for f in sorted(self.source.iterdir()) | |
if f.suffix.lower() in ALLOWED_EXTS] | |
if self.log_file and not self.dry_run: | |
with self.log_file.open('a') as log_file: | |
log_file.write(f'{self.source} ==> {self.folder_path}\n') | |
if not self.merge_spec: | |
self._copy_simple(source_files) | |
else: | |
self._copy_merge(source_files) | |
def _copy_simple(self, source_files): | |
"""Simple 1:1 copying/encoding""" | |
for i, source_file in enumerate(source_files, start=self.start_no): | |
dest_file = self.get_file_path(i, source_file) | |
if source_file.suffix.lower() == '.mp3': | |
# Don’t recode, just copy | |
logging.info(f'Copy {source_file} to {dest_file}.') | |
if not self.dry_run: | |
shutil.copy(source_file, dest_file) | |
else: | |
logging.info(f'Recode {source_file} to {dest_file}.') | |
command = ['ffmpeg', '-i', source_file, | |
'-map_metadata', '0:s:0', | |
'-q:a', str(AUDIO_QUALITY), | |
dest_file] | |
logging.debug(f'Running command {command}') | |
if not self.dry_run: | |
subprocess.run(command, stderr=subprocess.DEVNULL) | |
def _copy_merge(self, source_files): | |
"""Merging tracks while copying/encoding""" | |
for i, track_group in enumerate(self.merge_spec, start=self.start_no): | |
dest_file = self.get_file_path(i) | |
logging.info(f'Recode tracks {track_group} to {dest_file}.') | |
# TODO: Handle non-recoding case | |
# https://trac.ffmpeg.org/wiki/Concatenate | |
command = ['ffmpeg'] | |
for track in track_group: | |
j = track - 1 # merge spec uses 1 based indexing | |
command.extend(['-i', source_files[j]]) | |
command.extend(['-filter_complex', | |
_generate_filter_spec(len(track_group)), | |
'-map', '[outa]', | |
'-q:a', str(AUDIO_QUALITY), | |
dest_file, | |
]) | |
logging.debug(f'Running command {command}') | |
if not self.dry_run: | |
subprocess.run(command, stderr=subprocess.DEVNULL) | |
def _generate_filter_spec(n): | |
"""Generate an ffmpeg filter specification.""" | |
parts = [f'[{i}:a]' for i in range(n)] | |
parts.append(f'concat=n={n}:v=0:a=1') | |
parts.append('[outa]') | |
return ' '.join(parts) | |
def main(): | |
# Parse commandline arguments | |
arg_parser = argparse.ArgumentParser() | |
arg_parser.add_argument('-v', '--verbose', action='store_true') | |
arg_parser.add_argument('-n', '--dry-run', action='store_true') | |
arg_parser.add_argument('-o', '--outdir', default='Music') | |
arg_parser.add_argument('-l', '--log-file', default='music.log') | |
arg_parser.add_argument('-k', '--keep-filename', action='store_true') | |
arg_parser.add_argument('-m', '--merge-spec', nargs='+', default=None, | |
action=parse_merge_spec, | |
help='Merge multiple tracks, e.g. 1-3 4-5') | |
arg_parser.add_argument('-a', '--append', type=int, nargs='?', | |
const='-1', default=None) | |
arg_parser.add_argument('source') | |
args = arg_parser.parse_args() | |
# Set up logging | |
if args.verbose: | |
level = logging.DEBUG | |
else: | |
level = logging.ERROR | |
logging.basicConfig(level=level) | |
logging.debug(f'Merge spec: {args.merge_spec}') | |
copier = MusicCopier(source=args.source, | |
dest=args.outdir, | |
log_file=args.log_file, | |
keep_filename=args.keep_filename, | |
append=args.append, | |
merge_spec=args.merge_spec, | |
dry_run=args.dry_run) | |
copier.copy() | |
# Return exit value | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment