Skip to content

Instantly share code, notes, and snippets.

@frederik-elwert
Created March 12, 2021 19:51
Show Gist options
  • Save frederik-elwert/1e41a04d317a58b69ddb7d1882e98a3f to your computer and use it in GitHub Desktop.
Save frederik-elwert/1e41a04d317a58b69ddb7d1882e98a3f to your computer and use it in GitHub Desktop.
Script for converting music or audio books into the format and directory structure required by TonUINO
#!/usr/bin/env python3
import sys
import argparse
import logging
import shutil
import subprocess
from pathlib import Path
MAX_DIRS = 99
ALLOWED_EXTS = ['.mp3', '.ogg', '.oga', '.flac']
AUDIO_QUALITY = 4
class parse_merge_spec(argparse.Action):
"""Parse merge-spec option."""
def __call__(self, parser, namespace, values, option_string):
"""Return list of lists of track numbers.
>>> parse_merge_spec()(['1-3', '4-5'])
[[1, 2, 3], [4, 5]]
"""
out = []
for spec in values:
if '-' in spec:
# Track range
start, end = [int(s) for s in spec.split('-')]
out.append(list(range(start, end + 1)))
else:
# Single track
out.append([int(spec)])
setattr(namespace, self.dest, out)
class MusicCopier:
"""Copy music from a source to SD Card."""
def __init__(self, source, dest, log_file=None, keep_filename=False,
append=None, merge_spec=None, dry_run=False):
self.source = Path(source)
self.dest = Path(dest)
self.log_file = Path(log_file)
self.keep_filename = keep_filename
self.merge_spec = merge_spec
self.dry_run = dry_run
self.append = append
self.start_no = 1 # gets overridden later if append != None
def get_folder_path(self):
dirs = sorted(self.dest.iterdir())
if not dirs:
return self.dest / '01'
# Quick check
for i, folder in enumerate(dirs, start=1):
assert f'{i:02d}' == folder.name, 'Folders not in sequential order'
if i >= MAX_DIRS:
logging.error('Maximal number of folders reached.'
' Cannot add music.')
# TODO: Raise proper exception
return None
# Append to existing folder?
path = None
if self.append == -1:
path = dirs[-1]
elif self.append:
path = dirs[self.append - 1] # Convert to 0-based index
if path:
logging.debug(f'Append to path {path}.')
files = sorted(path.glob('*.mp3'))
last_file = files[-1]
self.start_no = int(last_file.name[0:3]) + 1 # number part
logging.debug(f'Start with file number {self.start_no}.')
return path
# Or return a new one.
return self.dest / f'{i+1:02d}'
def get_file_path(self, counter, source_file=None):
if self.keep_filename and source_file:
base = source_file.stem.replace(' ', '_')
return self.folder_path / f'{counter:03d}_{base}.mp3'
else:
return self.folder_path / f'{counter:03d}.mp3'
def copy(self):
self.folder_path = self.get_folder_path()
if not self.folder_path.is_dir() and not self.dry_run:
logging.debug(f'Create output directory {self.folder_path}.')
self.folder_path.mkdir()
source_files = [f for f in sorted(self.source.iterdir())
if f.suffix.lower() in ALLOWED_EXTS]
if self.log_file and not self.dry_run:
with self.log_file.open('a') as log_file:
log_file.write(f'{self.source} ==> {self.folder_path}\n')
if not self.merge_spec:
self._copy_simple(source_files)
else:
self._copy_merge(source_files)
def _copy_simple(self, source_files):
"""Simple 1:1 copying/encoding"""
for i, source_file in enumerate(source_files, start=self.start_no):
dest_file = self.get_file_path(i, source_file)
if source_file.suffix.lower() == '.mp3':
# Don’t recode, just copy
logging.info(f'Copy {source_file} to {dest_file}.')
if not self.dry_run:
shutil.copy(source_file, dest_file)
else:
logging.info(f'Recode {source_file} to {dest_file}.')
command = ['ffmpeg', '-i', source_file,
'-map_metadata', '0:s:0',
'-q:a', str(AUDIO_QUALITY),
dest_file]
logging.debug(f'Running command {command}')
if not self.dry_run:
subprocess.run(command, stderr=subprocess.DEVNULL)
def _copy_merge(self, source_files):
"""Merging tracks while copying/encoding"""
for i, track_group in enumerate(self.merge_spec, start=self.start_no):
dest_file = self.get_file_path(i)
logging.info(f'Recode tracks {track_group} to {dest_file}.')
# TODO: Handle non-recoding case
# https://trac.ffmpeg.org/wiki/Concatenate
command = ['ffmpeg']
for track in track_group:
j = track - 1 # merge spec uses 1 based indexing
command.extend(['-i', source_files[j]])
command.extend(['-filter_complex',
_generate_filter_spec(len(track_group)),
'-map', '[outa]',
'-q:a', str(AUDIO_QUALITY),
dest_file,
])
logging.debug(f'Running command {command}')
if not self.dry_run:
subprocess.run(command, stderr=subprocess.DEVNULL)
def _generate_filter_spec(n):
"""Generate an ffmpeg filter specification."""
parts = [f'[{i}:a]' for i in range(n)]
parts.append(f'concat=n={n}:v=0:a=1')
parts.append('[outa]')
return ' '.join(parts)
def main():
# Parse commandline arguments
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-v', '--verbose', action='store_true')
arg_parser.add_argument('-n', '--dry-run', action='store_true')
arg_parser.add_argument('-o', '--outdir', default='Music')
arg_parser.add_argument('-l', '--log-file', default='music.log')
arg_parser.add_argument('-k', '--keep-filename', action='store_true')
arg_parser.add_argument('-m', '--merge-spec', nargs='+', default=None,
action=parse_merge_spec,
help='Merge multiple tracks, e.g. 1-3 4-5')
arg_parser.add_argument('-a', '--append', type=int, nargs='?',
const='-1', default=None)
arg_parser.add_argument('source')
args = arg_parser.parse_args()
# Set up logging
if args.verbose:
level = logging.DEBUG
else:
level = logging.ERROR
logging.basicConfig(level=level)
logging.debug(f'Merge spec: {args.merge_spec}')
copier = MusicCopier(source=args.source,
dest=args.outdir,
log_file=args.log_file,
keep_filename=args.keep_filename,
append=args.append,
merge_spec=args.merge_spec,
dry_run=args.dry_run)
copier.copy()
# Return exit value
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment